JingsongLi commented on code in PR #1443:
URL: https://github.com/apache/incubator-paimon/pull/1443#discussion_r1251619016


##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -290,19 +297,151 @@ public void createTag(String tagName, long 
fromSnapshotId) {
                 fromSnapshotId);
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
-        TagManager tagManager = new TagManager(fileIO, path);
-        tagManager.createTag(snapshot, tagName);
+        tagManager().createTag(snapshot, tagName);
     }
 
     @Override
     public void deleteTag(String tagName) {
-        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        tagManager().deleteTag(tagName, store().newTagDeletion(), 
snapshotManager());
+    }
+
+    @Override
+    public void rollbackTo(String tagName) {
+        TagManager tagManager = tagManager();
+        checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' 
doesn't exist.", tagName);
 
-        TagManager tagManager = new TagManager(fileIO, path);
         Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
+        cleanLargerThan(taggedSnapshot);
+
+        try {
+            // it is possible that the earliest snapshot is later than the 
rollback tag because of
+            // snapshot expiration, in this case the `cleanLargerThan` method 
will delete all
+            // snapshots, so we should write the tag file to snapshot 
directory and modify the
+            // earliest hint
+            SnapshotManager snapshotManager = snapshotManager();
+            if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+                fileIO.writeFileUtf8(
+                        snapshotManager().snapshotPath(taggedSnapshot.id()),
+                        fileIO.readFileUtf8(tagManager.tagPath(tagName)));
+                snapshotManager.commitEarliestHint(taggedSnapshot.id());
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    /**
+     * Clean snapshots and tags whose id is larger than given snapshot's in 
the reverse direction.
+     *
+     * <p>Snapshots are cleaned first because it is easier to collect their 
deletion skipping set.
+     */
+    private void cleanLargerThan(Snapshot snapshot) {

Review Comment:
   Create a separate class to contain `cleanLargerThan` method. For example, a 
`RollbackHelper` class.



##########
paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java:
##########
@@ -224,46 +92,53 @@ void doDeleteExpiredDataFiles(
                 (path, pair) -> {
                     ManifestEntry entry = pair.getLeft();
                     // check whether we should skip the data file
-                    if (!dataFileSkipper.test(entry)) {
+                    if (!skipper.test(entry)) {
                         // delete data files
                         fileIO.deleteQuietly(path);
                         pair.getRight().forEach(fileIO::deleteQuietly);
-                        // record changed buckets
-                        deletionBuckets
-                                .computeIfAbsent(entry.partition(), p -> new 
HashSet<>())
-                                .add(entry.bucket());
+
+                        recordDeletionBuckets(entry);
                     }
                 });
     }
 
-    private Iterable<ManifestEntry> getManifestEntriesFromManifestList(String 
manifestListName) {
-        Queue<String> files =
-                tryReadManifestList(manifestListName).stream()
-                        .map(ManifestFileMeta::fileName)
-                        .collect(Collectors.toCollection(LinkedList::new));
-        return Iterables.concat(
-                (Iterable<Iterable<ManifestEntry>>)
-                        () ->
-                                new Iterator<Iterable<ManifestEntry>>() {
-                                    @Override
-                                    public boolean hasNext() {
-                                        return files.size() > 0;
-                                    }
+    /**
+     * Delete added file in the manifest list files. Added files marked as 
"ADD" in manifests.
+     *
+     * @param manifestListName name of manifest list
+     */
+    public void deleteAddedDataFiles(String manifestListName) {
+        for (ManifestEntry entry : tryReadManifestEntries(manifestListName)) {
+            if (entry.kind() == FileKind.ADD) {
+                fileIO.deleteQuietly(
+                        new Path(
+                                pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
+                                entry.file().fileName()));
+                recordDeletionBuckets(entry);
+            }
+        }
+    }
 
-                                    @Override
-                                    public Iterable<ManifestEntry> next() {
-                                        String file = files.poll();
-                                        try {
-                                            return manifestFile.read(file);
-                                        } catch (Exception e) {
-                                            LOG.warn("Failed to read manifest 
file " + file, e);
-                                            return Collections.emptyList();
-                                        }
-                                    }
-                                });
+    private Iterable<ManifestEntry> tryReadManifestEntries(String 
manifestListName) {
+        return readManifestEntries(tryReadManifestList(manifestListName));
     }
 
-    public Set<String> collectManifestSkippingSet(Snapshot snapshot) {
-        return DeletionUtils.collectManifestSkippingSet(snapshot, 
manifestList, indexFileHandler);
+    /** Used to record which tag is cached in tagged snapshots list. */
+    private int cachedTagIndex = -1;
+
+    /** Used to cache data files used by current tag. */
+    private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles 
= new HashMap<>();
+
+    public Predicate<ManifestEntry> dataFileSkipper(

Review Comment:
   You can create a separate class `TagFileSkipper`.



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -290,19 +297,151 @@ public void createTag(String tagName, long 
fromSnapshotId) {
                 fromSnapshotId);
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
-        TagManager tagManager = new TagManager(fileIO, path);
-        tagManager.createTag(snapshot, tagName);
+        tagManager().createTag(snapshot, tagName);
     }
 
     @Override
     public void deleteTag(String tagName) {
-        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        tagManager().deleteTag(tagName, store().newTagDeletion(), 
snapshotManager());
+    }
+
+    @Override
+    public void rollbackTo(String tagName) {
+        TagManager tagManager = tagManager();
+        checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' 
doesn't exist.", tagName);
 
-        TagManager tagManager = new TagManager(fileIO, path);
         Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
+        cleanLargerThan(taggedSnapshot);
+
+        try {
+            // it is possible that the earliest snapshot is later than the 
rollback tag because of
+            // snapshot expiration, in this case the `cleanLargerThan` method 
will delete all
+            // snapshots, so we should write the tag file to snapshot 
directory and modify the
+            // earliest hint
+            SnapshotManager snapshotManager = snapshotManager();
+            if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+                fileIO.writeFileUtf8(
+                        snapshotManager().snapshotPath(taggedSnapshot.id()),
+                        fileIO.readFileUtf8(tagManager.tagPath(tagName)));
+                snapshotManager.commitEarliestHint(taggedSnapshot.id());
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    /**
+     * Clean snapshots and tags whose id is larger than given snapshot's in 
the reverse direction.
+     *
+     * <p>Snapshots are cleaned first because it is easier to collect their 
deletion skipping set.
+     */
+    private void cleanLargerThan(Snapshot snapshot) {
+        // -------------------------------- prepare 
--------------------------------
+
+        SnapshotManager snapshotManager = snapshotManager();
+        TagManager tagManager = tagManager();
+
+        long snapshotId = snapshot.id();
+        List<Snapshot> toBeCleaned = new ArrayList<>();
+        List<Snapshot> skippedSnapshots = new ArrayList<>();
+
+        long earliest =
+                checkNotNull(
+                        snapshotManager.earliestSnapshotId(), "Cannot find 
earliest snapshot.");
+        long latest =
+                checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find 
latest snapshot.");
+
+        // ---------------------------- clean snapshots 
----------------------------
+
+        // delete snapshot files first, cannot be read now
+        // it is possible that some snapshots have been expired
+        long to = Math.max(earliest, snapshotId + 1);
+        for (long i = latest; i >= to; i--) {
+            toBeCleaned.add(snapshotManager.snapshot(i));
+            fileIO().deleteQuietly(snapshotManager.snapshotPath(i));
+        }
+
+        SnapshotDeletion snapshotDeletion = store().newSnapshotDeletion();
+
+        // delete data files
+        // don't concern about tag data files because file deletion methods 
won't throw exception
+        // when deleting non-existing data files
+        for (Snapshot s : toBeCleaned) {
+            snapshotDeletion.deleteAddedDataFiles(s.deltaManifestList());
+            snapshotDeletion.deleteAddedDataFiles(s.changelogManifestList());
+        }
 
+        // delete directories
+        snapshotDeletion.cleanDataDirectories();
+
+        // delete manifest files
+        skippedSnapshots.add(snapshot);
+        // NOTE: must skip tag manifests because tag deletion will read them
+        List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
+            Snapshot tag = taggedSnapshots.get(i);
+            if (tag.id() <= snapshotId) {
+                break;
+            }
+            skippedSnapshots.add(tag);
+        }
+        for (Snapshot s : toBeCleaned) {
+            snapshotDeletion.cleanUnusedManifests(s, skippedSnapshots);
+        }
+
+        // ------------------------------- clean tags 
-------------------------------
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        if (tags.isEmpty()) {
+            return;
+        }
+
+        // delete tag files and collect skipping set
+        skippedSnapshots.clear();
+        toBeCleaned.clear();
+        for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {

Review Comment:
   `skippedSnapshots` should just be a snapshot.



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -290,19 +297,151 @@ public void createTag(String tagName, long 
fromSnapshotId) {
                 fromSnapshotId);
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
-        TagManager tagManager = new TagManager(fileIO, path);
-        tagManager.createTag(snapshot, tagName);
+        tagManager().createTag(snapshot, tagName);
     }
 
     @Override
     public void deleteTag(String tagName) {
-        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        tagManager().deleteTag(tagName, store().newTagDeletion(), 
snapshotManager());
+    }
+
+    @Override
+    public void rollbackTo(String tagName) {
+        TagManager tagManager = tagManager();
+        checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' 
doesn't exist.", tagName);
 
-        TagManager tagManager = new TagManager(fileIO, path);
         Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
+        cleanLargerThan(taggedSnapshot);
+
+        try {
+            // it is possible that the earliest snapshot is later than the 
rollback tag because of
+            // snapshot expiration, in this case the `cleanLargerThan` method 
will delete all
+            // snapshots, so we should write the tag file to snapshot 
directory and modify the
+            // earliest hint
+            SnapshotManager snapshotManager = snapshotManager();
+            if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
+                fileIO.writeFileUtf8(
+                        snapshotManager().snapshotPath(taggedSnapshot.id()),
+                        fileIO.readFileUtf8(tagManager.tagPath(tagName)));
+                snapshotManager.commitEarliestHint(taggedSnapshot.id());
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    /**
+     * Clean snapshots and tags whose id is larger than given snapshot's in 
the reverse direction.
+     *
+     * <p>Snapshots are cleaned first because it is easier to collect their 
deletion skipping set.
+     */
+    private void cleanLargerThan(Snapshot snapshot) {
+        // -------------------------------- prepare 
--------------------------------
+
+        SnapshotManager snapshotManager = snapshotManager();
+        TagManager tagManager = tagManager();
+
+        long snapshotId = snapshot.id();
+        List<Snapshot> toBeCleaned = new ArrayList<>();
+        List<Snapshot> skippedSnapshots = new ArrayList<>();
+
+        long earliest =
+                checkNotNull(
+                        snapshotManager.earliestSnapshotId(), "Cannot find 
earliest snapshot.");
+        long latest =
+                checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find 
latest snapshot.");
+
+        // ---------------------------- clean snapshots 
----------------------------
+
+        // delete snapshot files first, cannot be read now
+        // it is possible that some snapshots have been expired
+        long to = Math.max(earliest, snapshotId + 1);
+        for (long i = latest; i >= to; i--) {
+            toBeCleaned.add(snapshotManager.snapshot(i));
+            fileIO().deleteQuietly(snapshotManager.snapshotPath(i));
+        }
+
+        SnapshotDeletion snapshotDeletion = store().newSnapshotDeletion();
+
+        // delete data files
+        // don't concern about tag data files because file deletion methods 
won't throw exception
+        // when deleting non-existing data files
+        for (Snapshot s : toBeCleaned) {
+            snapshotDeletion.deleteAddedDataFiles(s.deltaManifestList());
+            snapshotDeletion.deleteAddedDataFiles(s.changelogManifestList());
+        }
 
+        // delete directories
+        snapshotDeletion.cleanDataDirectories();
+
+        // delete manifest files
+        skippedSnapshots.add(snapshot);
+        // NOTE: must skip tag manifests because tag deletion will read them

Review Comment:
   Why? You can have a unified loop.



##########
paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java:
##########
@@ -77,6 +80,40 @@ public void createTag(Snapshot snapshot, String tagName) {
         }
     }
 
+    public void deleteTag(
+            String tagName, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
+        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
+
+        Snapshot taggedSnapshot = taggedSnapshot(tagName);
+        List<Snapshot> taggedSnapshots = taggedSnapshots();

Review Comment:
   move `taggedSnapshots` down after `if (snapshotManager.snapshotExists)`.



##########
paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java:
##########
@@ -77,6 +80,40 @@ public void createTag(Snapshot snapshot, String tagName) {
         }
     }
 
+    public void deleteTag(
+            String tagName, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
+        checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
+        checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
+
+        Snapshot taggedSnapshot = taggedSnapshot(tagName);
+        List<Snapshot> taggedSnapshots = taggedSnapshots();
+        fileIO.deleteQuietly(tagPath(tagName));
+
+        // skip file deletion if snapshot exists
+        if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
+            return;
+        }
+
+        // collect skipping sets from the earliest snapshot and neighbor tags
+        List<Snapshot> skippedSnapshots = new ArrayList<>();
+        skippedSnapshots.add(snapshotManager.earliestSnapshot());

Review Comment:
   Here we just need to add one or two snapshots. Never three snapshots.



##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java:
##########
@@ -174,32 +169,26 @@ public void expireUntil(long earliestId, long 
endExclusiveId) {
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
             if (snapshot.changelogManifestList() != null) {
-                snapshotDeletion.deleteAddedDataFiles(
-                        snapshot.changelogManifestList(), deletionBuckets);
+                
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
             }
         }
 
         // data files and changelog files in bucket directories has been 
deleted
         // then delete changed bucket directories if they are empty
-        snapshotDeletion.tryDeleteDirectories(deletionBuckets);
+        snapshotDeletion.cleanDataDirectories();
 
         // delete manifests and indexFiles
-        Set<String> skipManifestFiles =
-                snapshotDeletion.collectManifestSkippingSet(
-                        snapshotManager.snapshot(endExclusiveId));
-        for (Snapshot snapshot :
-                tagFileKeeper.findOverlappedSnapshots(beginInclusiveId, 
endExclusiveId)) {
-            skipManifestFiles.add(snapshot.baseManifestList());
-            skipManifestFiles.add(snapshot.deltaManifestList());
-            
skipManifestFiles.addAll(snapshotDeletion.collectManifestSkippingSet(snapshot));
-        }
+        List<Snapshot> skippingSnapshots =

Review Comment:
   Change `Set<String>` to `List<Snapshot>` has performance issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to