This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b87538522 [core] Fix that tag deletion might delete used data files 
(#2409)
b87538522 is described below

commit b875385229cc08ba6d82b727ed8ca4652695c98f
Author: yuzelin <[email protected]>
AuthorDate: Wed Nov 29 12:23:30 2023 +0800

    [core] Fix that tag deletion might delete used data files (#2409)
---
 .../apache/paimon/operation/FileDeletionBase.java  | 28 ++++++------
 .../org/apache/paimon/operation/TagDeletion.java   | 52 ++++++++++------------
 .../java/org/apache/paimon/utils/ObjectsFile.java  | 34 +++++++++-----
 .../paimon/operation/FileStoreExpireTestBase.java  |  9 +++-
 .../operation/UncleanedFileStoreExpireTest.java    | 41 +++++++++++++++++
 5 files changed, 108 insertions(+), 56 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index a26bf7b5a..e2c094ea0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -229,28 +229,28 @@ public abstract class FileDeletionBase {
      */
     protected void addMergedDataFiles(
             Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot 
snapshot)
-            throws Exception {
+            throws IOException {
+        for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
+            dataFiles
+                    .computeIfAbsent(entry.partition(), p -> new HashMap<>())
+                    .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
+                    .add(entry.file().fileName());
+        }
+    }
+
+    protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot) 
throws IOException {
         // read data manifests
         List<String> files = tryReadDataManifests(snapshot);
 
-        // try merging
+        // read and merge manifest entries
         Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
-        for (String file : files) {
+        for (String manifest : files) {
             List<ManifestEntry> entries;
-            try {
-                entries = manifestFile.read(file);
-            } catch (Exception e) {
-                throw new Exception("Failed to read manifest file " + file, e);
-            }
+            entries = manifestFile.readWithIOException(manifest);
             ManifestEntry.mergeEntries(entries, map);
         }
 
-        for (ManifestEntry entry : map.values()) {
-            dataFiles
-                    .computeIfAbsent(entry.partition(), p -> new HashMap<>())
-                    .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
-                    .add(entry.file().fileName());
-        }
+        return map.values();
     }
 
     protected boolean containsDataFile(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 09531e75c..96b0a3ee6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -31,6 +31,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,43 +57,35 @@ public class TagDeletion extends FileDeletionBase {
 
     @Override
     public void cleanUnusedDataFiles(Snapshot taggedSnapshot, 
Predicate<ManifestEntry> skipper) {
-        cleanUnusedDataFiles(tryReadDataManifests(taggedSnapshot), skipper);
-    }
-
-    @Override
-    public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String> 
skippingSet) {
-        // doesn't clean changelog files because they are handled by 
SnapshotDeletion
-        cleanUnusedManifests(taggedSnapshot, skippingSet, false);
-    }
+        Collection<ManifestEntry> manifestEntries;
+        try {
+            manifestEntries = readMergedDataFiles(taggedSnapshot);
+        } catch (IOException e) {
+            LOG.info("Skip data file clean for the tag of id {}.", 
taggedSnapshot.id(), e);
+            return;
+        }
 
-    private void cleanUnusedDataFiles(
-            List<String> manifestFileNames, Predicate<ManifestEntry> skipper) {
         Set<Path> dataFileToDelete = new HashSet<>();
-        for (String manifest : manifestFileNames) {
-            List<ManifestEntry> manifestEntries;
-            try {
-                manifestEntries = manifestFile.read(manifest);
-            } catch (Exception e) {
-                // We want to delete the data file, so just ignore the 
unavailable files
-                LOG.info("Failed to read manifest " + manifest + ". Ignore 
it.", e);
-                continue;
-            }
-
-            for (ManifestEntry entry : manifestEntries) {
-                if (!skipper.test(entry)) {
-                    Path bucketPath = 
pathFactory.bucketPath(entry.partition(), entry.bucket());
-                    dataFileToDelete.add(new Path(bucketPath, 
entry.file().fileName()));
-                    for (String file : entry.file().extraFiles()) {
-                        dataFileToDelete.add(new Path(bucketPath, file));
-                    }
-
-                    recordDeletionBuckets(entry);
+        for (ManifestEntry entry : manifestEntries) {
+            if (!skipper.test(entry)) {
+                Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
+                dataFileToDelete.add(new Path(bucketPath, 
entry.file().fileName()));
+                for (String file : entry.file().extraFiles()) {
+                    dataFileToDelete.add(new Path(bucketPath, file));
                 }
+
+                recordDeletionBuckets(entry);
             }
         }
         deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
     }
 
+    @Override
+    public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String> 
skippingSet) {
+        // doesn't clean changelog files because they are handled by 
SnapshotDeletion
+        cleanUnusedManifests(taggedSnapshot, skippingSet, false);
+    }
+
     public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) 
throws Exception {
         return dataFileSkipper(Collections.singletonList(fromSnapshot));
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 6ccd9111b..51b504926 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -78,6 +78,10 @@ public abstract class ObjectsFile<T> {
         return read(fileName, Filter.alwaysTrue(), Filter.alwaysTrue());
     }
 
+    public List<T> readWithIOException(String fileName) throws IOException {
+        return readWithIOException(fileName, Filter.alwaysTrue(), 
Filter.alwaysTrue());
+    }
+
     public boolean exists(String fileName) {
         try {
             return fileIO.exists(pathFactory.toPath(fileName));
@@ -89,23 +93,29 @@ public abstract class ObjectsFile<T> {
     public List<T> read(
             String fileName, Filter<InternalRow> loadFilter, 
Filter<InternalRow> readFilter) {
         try {
-            if (cache != null) {
-                return cache.read(fileName, loadFilter, readFilter);
-            }
-
-            RecordReader<InternalRow> reader =
-                    createFormatReader(fileIO, readerFactory, 
pathFactory.toPath(fileName));
-            if (readFilter != Filter.ALWAYS_TRUE) {
-                reader = reader.filter(readFilter);
-            }
-            List<T> result = new ArrayList<>();
-            reader.forEachRemaining(row -> 
result.add(serializer.fromRow(row)));
-            return result;
+            return readWithIOException(fileName, loadFilter, readFilter);
         } catch (IOException e) {
             throw new RuntimeException("Failed to read manifest list " + 
fileName, e);
         }
     }
 
+    public List<T> readWithIOException(
+            String fileName, Filter<InternalRow> loadFilter, 
Filter<InternalRow> readFilter)
+            throws IOException {
+        if (cache != null) {
+            return cache.read(fileName, loadFilter, readFilter);
+        }
+
+        RecordReader<InternalRow> reader =
+                createFormatReader(fileIO, readerFactory, 
pathFactory.toPath(fileName));
+        if (readFilter != Filter.ALWAYS_TRUE) {
+            reader = reader.filter(readFilter);
+        }
+        List<T> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.fromRow(row)));
+        return result;
+    }
+
     public String writeWithoutRolling(Collection<T> records) {
         return writeWithoutRolling(records.iterator());
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
index b5c3f7c34..ad1ef4c33 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
@@ -110,11 +110,18 @@ public class FileStoreExpireTestBase {
     protected void assertSnapshot(
             int snapshotId, List<KeyValue> allData, List<Integer> 
snapshotPositions)
             throws Exception {
+        assertSnapshot(snapshotManager.snapshot(snapshotId), allData, 
snapshotPositions);
+    }
+
+    protected void assertSnapshot(
+            Snapshot snapshot, List<KeyValue> allData, List<Integer> 
snapshotPositions)
+            throws Exception {
+        int snapshotId = (int) snapshot.id();
         Map<BinaryRow, BinaryRow> expected =
                 store.toKvMap(allData.subList(0, 
snapshotPositions.get(snapshotId - 1)));
         List<KeyValue> actualKvs =
                 store.readKvsFromManifestEntries(
-                        
store.newScan().withSnapshot(snapshotId).plan().files(), false);
+                        store.newScan().withSnapshot(snapshot).plan().files(), 
false);
         gen.sort(actualKvs);
         Map<BinaryRow, BinaryRow> actual = store.toKvMap(actualKvs);
         assertThat(actual).isEqualTo(expected);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ef542e3f1..ecaf5ae8e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -19,7 +19,9 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.Test;
 
@@ -27,6 +29,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
@@ -80,4 +83,42 @@ public class UncleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
         assertSnapshot(latestSnapshotId, allData, snapshotPositions);
     }
+
+    @Test
+    public void testMixedSnapshotAndTagDeletion() throws Exception {
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        commit(random.nextInt(10) + 30, allData, snapshotPositions);
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
+        TagManager tagManager = store.newTagManager();
+
+        // create tags for each snapshot
+        for (int id = 1; id <= latestSnapshotId; id++) {
+            Snapshot snapshot = snapshotManager.snapshot(id);
+            tagManager.createTag(snapshot, "tag" + id);
+        }
+
+        // randomly expire snapshots
+        int expired = random.nextInt(latestSnapshotId / 2) + 1;
+        int retained = latestSnapshotId - expired;
+        store.newExpire(retained, retained, Long.MAX_VALUE).expire();
+
+        // randomly delete tags
+        for (int id = 1; id <= latestSnapshotId; id++) {
+            if (random.nextBoolean()) {
+                tagManager.deleteTag("tag" + id, store.newTagDeletion(), 
snapshotManager);
+            }
+        }
+
+        // check snapshots and tags
+        Set<Snapshot> allSnapshots = new HashSet<>();
+        snapshotManager.snapshots().forEachRemaining(allSnapshots::add);
+        allSnapshots.addAll(tagManager.taggedSnapshots());
+
+        for (Snapshot snapshot : allSnapshots) {
+            assertSnapshot(snapshot, allData, snapshotPositions);
+        }
+    }
 }

Reply via email to