This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1107f2b05 [core] FileDeletionBase should cancel reading manifest files 
when any file is failed to read (#2105)
1107f2b05 is described below

commit 1107f2b0521bc582370528479344d18eb63cfa03
Author: yuzelin <[email protected]>
AuthorDate: Tue Oct 10 22:00:16 2023 +0800

    [core] FileDeletionBase should cancel reading manifest files when any file 
is failed to read (#2105)
---
 .../apache/paimon/operation/FileDeletionBase.java  |  69 ++++++-------
 .../paimon/operation/FileStoreExpireImpl.java      |  17 +++-
 .../apache/paimon/operation/SnapshotDeletion.java  |  62 ++++++++++--
 .../org/apache/paimon/operation/TagDeletion.java   |  46 ++++++---
 .../org/apache/paimon/table/RollbackHelper.java    |  26 ++++-
 .../java/org/apache/paimon/utils/TagManager.java   |  24 ++++-
 .../operation/CleanedFileStoreExpireTest.java      |   2 +-
 .../apache/paimon/operation/FileDeletionTest.java  | 108 +++++++++++++++++++++
 8 files changed, 277 insertions(+), 77 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 8de8dfa85..a26bf7b5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -32,8 +32,6 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FileUtils;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +41,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -215,50 +211,41 @@ public abstract class FileDeletionBase {
         }
     }
 
-    public Iterable<ManifestEntry> tryReadManifestEntries(String 
manifestListName) {
-        return readManifestEntries(tryReadManifestList(manifestListName));
-    }
-
-    /** Try to read base and delta manifest lists at one time. */
-    protected Iterable<ManifestEntry> tryReadDataManifestEntries(Snapshot 
snapshot) {
+    protected List<String> tryReadDataManifests(Snapshot snapshot) {
         List<ManifestFileMeta> manifestFileMetas = 
tryReadManifestList(snapshot.baseManifestList());
         
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
-
-        return readManifestEntries(manifestFileMetas);
+        return readManifestFileNames(manifestFileMetas);
     }
 
-    protected Iterable<ManifestEntry> readManifestEntries(
-            List<ManifestFileMeta> manifestFileMetas) {
-        Queue<String> files =
-                manifestFileMetas.stream()
-                        .map(ManifestFileMeta::fileName)
-                        .collect(Collectors.toCollection(LinkedList::new));
-        return Iterables.concat(
-                (Iterable<Iterable<ManifestEntry>>)
-                        () ->
-                                new Iterator<Iterable<ManifestEntry>>() {
-                                    @Override
-                                    public boolean hasNext() {
-                                        return files.size() > 0;
-                                    }
-
-                                    @Override
-                                    public Iterable<ManifestEntry> next() {
-                                        String file = files.poll();
-                                        try {
-                                            return manifestFile.read(file);
-                                        } catch (Exception e) {
-                                            LOG.warn("Failed to read manifest 
file " + file, e);
-                                            return Collections.emptyList();
-                                        }
-                                    }
-                                });
+    protected List<String> readManifestFileNames(List<ManifestFileMeta> 
manifestFileMetas) {
+        return manifestFileMetas.stream()
+                .map(ManifestFileMeta::fileName)
+                .collect(Collectors.toCollection(LinkedList::new));
     }
 
+    /**
+     * NOTE: This method is used for building data file skipping set. If 
failed to read some
+     * manifests, it will throw exception which callers must handle.
+     */
     protected void addMergedDataFiles(
-            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot 
snapshot) {
-        Iterable<ManifestEntry> entries = tryReadDataManifestEntries(snapshot);
-        for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
+            Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot 
snapshot)
+            throws Exception {
+        // read data manifests
+        List<String> files = tryReadDataManifests(snapshot);
+
+        // try merging
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+        for (String file : files) {
+            List<ManifestEntry> entries;
+            try {
+                entries = manifestFile.read(file);
+            } catch (Exception e) {
+                throw new Exception("Failed to read manifest file " + file, e);
+            }
+            ManifestEntry.mergeEntries(entries, map);
+        }
+
+        for (ManifestEntry entry : map.values()) {
             dataFiles
                     .computeIfAbsent(entry.partition(), p -> new HashMap<>())
                     .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 8b177bdf9..ab6bb5d09 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -33,6 +34,7 @@ import java.util.List;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.Predicate;
 
 /**
  * Default implementation of {@link FileStoreExpire}. It retains a certain 
number or period of
@@ -178,8 +180,19 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             }
             Snapshot snapshot = snapshotManager.snapshot(id);
             // expire merge tree files and collect changed buckets
-            snapshotDeletion.cleanUnusedDataFiles(
-                    snapshot, 
snapshotDeletion.dataFileSkipper(taggedSnapshots, id));
+            Predicate<ManifestEntry> skipper;
+            try {
+                skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, 
id);
+            } catch (Exception e) {
+                LOG.info(
+                        String.format(
+                                "Skip cleaning data files of snapshot '%s' due 
to failed to build skipping set.",
+                                id),
+                        e);
+                continue;
+            }
+
+            snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper);
         }
 
         // delete changelog files
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index 8b9740c33..084845a50 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -32,6 +32,9 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.TagManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +45,8 @@ import java.util.function.Predicate;
 /** Delete snapshot files. */
 public class SnapshotDeletion extends FileDeletionBase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotDeletion.class);
+
     /** Used to record which tag is cached in tagged snapshots list. */
     private int cachedTagIndex = -1;
 
@@ -59,7 +64,25 @@ public class SnapshotDeletion extends FileDeletionBase {
 
     @Override
     public void cleanUnusedDataFiles(Snapshot snapshot, 
Predicate<ManifestEntry> skipper) {
-        
doCleanUnusedDataFile(tryReadManifestEntries(snapshot.deltaManifestList()), 
skipper);
+        // try read manifests
+        List<String> manifestFileNames =
+                
readManifestFileNames(tryReadManifestList(snapshot.deltaManifestList()));
+        List<ManifestEntry> manifestEntries = new ArrayList<>();
+        // data file path -> (original manifest entry, extra file paths)
+        Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
+        for (String manifest : manifestFileNames) {
+            try {
+                manifestEntries = manifestFile.read(manifest);
+            } catch (Exception e) {
+                // cancel deletion if any exception occurs
+                LOG.warn("Failed to read some manifest files. Cancel 
deletion.", e);
+                return;
+            }
+
+            getDataFileToDelete(dataFileToDelete, manifestEntries);
+        }
+
+        doCleanUnusedDataFile(dataFileToDelete, skipper);
     }
 
     @Override
@@ -67,14 +90,12 @@ public class SnapshotDeletion extends FileDeletionBase {
         cleanUnusedManifests(snapshot, skippingSet, true);
     }
 
-    @VisibleForTesting
-    void doCleanUnusedDataFile(
-            Iterable<ManifestEntry> dataFileLog, Predicate<ManifestEntry> 
skipper) {
+    private void getDataFileToDelete(
+            Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
+            List<ManifestEntry> dataFileEntries) {
         // we cannot delete a data file directly when we meet a DELETE entry, 
because that
         // file might be upgraded
-        // data file path -> (original manifest entry, extra file paths)
-        Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
-        for (ManifestEntry entry : dataFileLog) {
+        for (ManifestEntry entry : dataFileEntries) {
             Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
             Path dataFilePath = new Path(bucketPath, entry.file().fileName());
             switch (entry.kind()) {
@@ -93,7 +114,11 @@ public class SnapshotDeletion extends FileDeletionBase {
                             "Unknown value kind " + entry.kind().name());
             }
         }
+    }
 
+    private void doCleanUnusedDataFile(
+            Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
+            Predicate<ManifestEntry> skipper) {
         List<Path> actualDataFileToDelete = new ArrayList<>();
         dataFileToDelete.forEach(
                 (path, pair) -> {
@@ -110,16 +135,33 @@ public class SnapshotDeletion extends FileDeletionBase {
         deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly);
     }
 
+    @VisibleForTesting
+    void cleanUnusedDataFile(List<ManifestEntry> dataFileLog) {
+        Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new 
HashMap<>();
+        getDataFileToDelete(dataFileToDelete, dataFileLog);
+        doCleanUnusedDataFile(dataFileToDelete, f -> false);
+    }
+
     /**
      * Delete added file in the manifest list files. Added files marked as 
"ADD" in manifests.
      *
      * @param manifestListName name of manifest list
      */
     public void deleteAddedDataFiles(String manifestListName) {
-        deleteAddedDataFiles(tryReadManifestEntries(manifestListName));
+        List<String> manifestFileNames =
+                readManifestFileNames(tryReadManifestList(manifestListName));
+        for (String file : manifestFileNames) {
+            try {
+                List<ManifestEntry> manifestEntries = manifestFile.read(file);
+                deleteAddedDataFiles(manifestEntries);
+            } catch (Exception e) {
+                // We want to delete the data file, so just ignore the 
unavailable files
+                LOG.info("Failed to read manifest " + file + ". Ignore it.", 
e);
+            }
+        }
     }
 
-    public void deleteAddedDataFiles(Iterable<ManifestEntry> manifestEntries) {
+    private void deleteAddedDataFiles(List<ManifestEntry> manifestEntries) {
         List<Path> dataFileToDelete = new ArrayList<>();
         for (ManifestEntry entry : manifestEntries) {
             if (entry.kind() == FileKind.ADD) {
@@ -134,7 +176,7 @@ public class SnapshotDeletion extends FileDeletionBase {
     }
 
     public Predicate<ManifestEntry> dataFileSkipper(
-            List<Snapshot> taggedSnapshots, long expiringSnapshotId) {
+            List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws 
Exception {
         int index = TagManager.findPreviousTag(taggedSnapshots, 
expiringSnapshotId);
         // refresh tag data files
         if (index >= 0 && cachedTagIndex != index) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 77e3d7c70..09531e75c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -28,9 +28,12 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.utils.FileStorePathFactory;
 
-import java.util.ArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,6 +42,8 @@ import java.util.function.Predicate;
 /** Delete tag files. */
 public class TagDeletion extends FileDeletionBase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TagDeletion.class);
+
     public TagDeletion(
             FileIO fileIO,
             FileStorePathFactory pathFactory,
@@ -50,7 +55,7 @@ public class TagDeletion extends FileDeletionBase {
 
     @Override
     public void cleanUnusedDataFiles(Snapshot taggedSnapshot, 
Predicate<ManifestEntry> skipper) {
-        cleanUnusedDataFiles(tryReadDataManifestEntries(taggedSnapshot), 
skipper);
+        cleanUnusedDataFiles(tryReadDataManifests(taggedSnapshot), skipper);
     }
 
     @Override
@@ -59,28 +64,39 @@ public class TagDeletion extends FileDeletionBase {
         cleanUnusedManifests(taggedSnapshot, skippingSet, false);
     }
 
-    public void cleanUnusedDataFiles(
-            Iterable<ManifestEntry> entries, Predicate<ManifestEntry> skipper) 
{
-        List<Path> dataFileToDelete = new ArrayList<>();
-        for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
-            if (!skipper.test(entry)) {
-                Path bucketPath = pathFactory.bucketPath(entry.partition(), 
entry.bucket());
-                dataFileToDelete.add(new Path(bucketPath, 
entry.file().fileName()));
-                for (String file : entry.file().extraFiles()) {
-                    dataFileToDelete.add(new Path(bucketPath, file));
-                }
+    private void cleanUnusedDataFiles(
+            List<String> manifestFileNames, Predicate<ManifestEntry> skipper) {
+        Set<Path> dataFileToDelete = new HashSet<>();
+        for (String manifest : manifestFileNames) {
+            List<ManifestEntry> manifestEntries;
+            try {
+                manifestEntries = manifestFile.read(manifest);
+            } catch (Exception e) {
+                // We want to delete the data file, so just ignore the 
unavailable files
+                LOG.info("Failed to read manifest " + manifest + ". Ignore 
it.", e);
+                continue;
+            }
+
+            for (ManifestEntry entry : manifestEntries) {
+                if (!skipper.test(entry)) {
+                    Path bucketPath = 
pathFactory.bucketPath(entry.partition(), entry.bucket());
+                    dataFileToDelete.add(new Path(bucketPath, 
entry.file().fileName()));
+                    for (String file : entry.file().extraFiles()) {
+                        dataFileToDelete.add(new Path(bucketPath, file));
+                    }
 
-                recordDeletionBuckets(entry);
+                    recordDeletionBuckets(entry);
+                }
             }
         }
         deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
     }
 
-    public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) {
+    public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) 
throws Exception {
         return dataFileSkipper(Collections.singletonList(fromSnapshot));
     }
 
-    public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot> 
fromSnapshots) {
+    public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot> 
fromSnapshots) throws Exception {
         Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
         for (Snapshot snapshot : fromSnapshots) {
             addMergedDataFiles(skipped, snapshot);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 4843393e7..a1d5333e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -26,6 +26,9 @@ import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
@@ -40,6 +43,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 /** Helper class for {@link Table#rollbackTo} including utils to clean 
snapshots. */
 public class RollbackHelper {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(RollbackHelper.class);
+
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
     private final FileIO fileIO;
@@ -136,13 +141,24 @@ public class RollbackHelper {
         }
 
         // delete data files
-        Predicate<ManifestEntry> dataFileSkipper = 
tagDeletion.dataFileSkipper(retainedSnapshot);
-        for (Snapshot s : toBeCleaned) {
-            tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
+        Predicate<ManifestEntry> dataFileSkipper = null;
+        boolean success = true;
+        try {
+            dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
+        } catch (Exception e) {
+            LOG.info(
+                    "Skip cleaning data files for deleted tags due to failed 
to build skipping set.",
+                    e);
+            success = false;
         }
 
-        // delete directories
-        tagDeletion.cleanDataDirectories();
+        if (success) {
+            for (Snapshot s : toBeCleaned) {
+                tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
+            }
+            // delete directories
+            tagDeletion.cleanDataDirectories();
+        }
 
         return toBeCleaned;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 46e62b014..8a46b5f24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -24,6 +24,9 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.TagDeletion;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -38,6 +41,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Manager for {@code Tag}. */
 public class TagManager {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TagManager.class);
+
     private static final String TAG_PREFIX = "tag-";
 
     private final FileIO fileIO;
@@ -116,9 +121,22 @@ public class TagManager {
         skippedSnapshots.add(right);
 
         // delete data files and empty directories
-        Predicate<ManifestEntry> dataFileSkipper = 
tagDeletion.dataFileSkipper(skippedSnapshots);
-        tagDeletion.cleanUnusedDataFiles(taggedSnapshot, dataFileSkipper);
-        tagDeletion.cleanDataDirectories();
+        Predicate<ManifestEntry> dataFileSkipper = null;
+        boolean success = true;
+        try {
+            dataFileSkipper = tagDeletion.dataFileSkipper(skippedSnapshots);
+        } catch (Exception e) {
+            LOG.info(
+                    String.format(
+                            "Skip cleaning data files of tag '%s' due to 
failed to build skipping set.",
+                            tagName),
+                    e);
+            success = false;
+        }
+        if (success) {
+            tagDeletion.cleanUnusedDataFiles(taggedSnapshot, dataFileSkipper);
+            tagDeletion.cleanDataDirectories();
+        }
 
         // delete manifests
         tagDeletion.cleanUnusedManifests(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 91eb0b01c..1d5eb7329 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -88,7 +88,7 @@ public class CleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 
0, 1, dataFile);
 
         // expire
-        expire.snapshotDeletion().doCleanUnusedDataFile(Arrays.asList(add, 
delete), f -> false);
+        expire.snapshotDeletion().cleanUnusedDataFile(Arrays.asList(add, 
delete));
 
         // check
         assertThat(fileIO.exists(myDataFile)).isFalse();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index ede56ff14..d150ca457 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
@@ -45,6 +46,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -500,6 +503,111 @@ public class FileDeletionTest {
         }
     }
 
+    /**
+     * When a data file is upgraded, it will have a {@link FileKind#ADD} and a 
{@link
+     * FileKind#DELETE} entries. This test ensure that if the ADD entry cannot 
be read correctly
+     * when expiring, the data file won't be deleted. In this test we manually 
separate the ADD
+     * entry and delete entry into two manifest file and delete the ADD entry 
manifest file to
+     * simulate the read exception.
+     */
+    @Test
+    public void testExpireWithMissingManifest() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        SnapshotManager snapshotManager = store.snapshotManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // snapshot 1: commit A to bucket 0
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        List<KeyValue> kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 0, writers);
+        commitData(store, commitIdentifier++, writers);
+
+        // snapshot 2: compact
+        writers.values().stream()
+                .flatMap(m -> m.values().stream())
+                .forEach(
+                        writer -> {
+                            try {
+                                writer.compact(true);
+                                writer.sync();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        FileStoreTestUtils.commitData(store, commitIdentifier++, writers);
+
+        // check that there are one data file and get its path
+        FileStorePathFactory pathFactory = store.pathFactory();
+        Path bucket0 = pathFactory.bucketPath(partition, 0);
+        List<Path> datafiles =
+                Files.walk(Paths.get(bucket0.toString()))
+                        .filter(Files::isRegularFile)
+                        .filter(p -> 
p.getFileName().toString().startsWith("data"))
+                        .map(p -> new Path(p.toString()))
+                        .collect(Collectors.toList());
+        assertThat(datafiles.size()).isEqualTo(1);
+        Path dataFileA = datafiles.get(0);
+
+        // check the snapshot 2 has two delta manifests entry (-A, level=0), 
(+A, level=5)
+        Snapshot snapshot2 = snapshotManager.snapshot(2);
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+
+        String deltaManifestList = snapshot2.deltaManifestList();
+        List<ManifestFileMeta> manifestFileMetas = 
manifestList.read(snapshot2.deltaManifestList());
+        assertThat(manifestFileMetas.size()).isEqualTo(1);
+
+        String deltaManifestFile = manifestFileMetas.get(0).fileName();
+        List<ManifestEntry> entries = manifestFile.read(deltaManifestFile);
+        assertThat(entries.size()).isEqualTo(2);
+
+        ManifestEntry addEntry = null, deleteEntry = null;
+        for (ManifestEntry entry : entries) {
+            assertThat(entry.file().fileName()).isEqualTo(dataFileA.getName());
+            if (entry.kind() == FileKind.ADD) {
+                assertThat(addEntry).isNull();
+                addEntry = entry;
+                assertThat(entry.file().level()).isGreaterThan(0);
+            } else {
+                assertThat(deleteEntry).isNull();
+                deleteEntry = entry;
+                assertThat(entry.file().level()).isEqualTo(0);
+            }
+        }
+        assertThat(addEntry).isNotNull();
+        assertThat(deleteEntry).isNotNull();
+
+        // separate two entries to two manifest files and delete the (+A, 
level=5) manifest
+        
fileIO.deleteQuietly(pathFactory.toManifestListPath(deltaManifestList));
+        
fileIO.deleteQuietly(pathFactory.toManifestFilePath(deltaManifestFile));
+
+        List<ManifestFileMeta> newAddManifests =
+                manifestFile.write(Collections.singletonList(addEntry));
+        assertThat(newAddManifests.size()).isEqualTo(1);
+        String newAddManifestFileName = newAddManifests.get(0).fileName();
+        
fileIO.deleteQuietly(pathFactory.toManifestFilePath(newAddManifestFileName));
+
+        List<ManifestFileMeta> newDeleteManifests =
+                manifestFile.write(Collections.singletonList(deleteEntry));
+        assertThat(newDeleteManifests.size()).isEqualTo(1);
+
+        List<ManifestFileMeta> newManifests =
+                Arrays.asList(newAddManifests.get(0), 
newDeleteManifests.get(0));
+
+        String newManifestListName = manifestList.write(newManifests);
+
+        fileIO.rename(
+                pathFactory.toManifestListPath(newManifestListName),
+                pathFactory.toManifestListPath(deltaManifestList));
+
+        store.newExpire(1, 1, Long.MAX_VALUE).expire();
+
+        // check data file
+        assertPathExists(fileIO, dataFileA);
+    }
+
     private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode) throws Exception {
         return createStore(mode, 2);
     }

Reply via email to