This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 26ac4b1c02 [core] Fix that snapshot expire might delete files used by
tag mistakenly (#5237)
26ac4b1c02 is described below
commit 26ac4b1c02535be52da5c3b5ec35df3e86f2607b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 10 10:07:10 2025 +0800
[core] Fix that snapshot expire might delete files used by tag mistakenly
(#5237)
---
.../org/apache/paimon/operation/FileDeletionBase.java | 17 +++++++----------
.../java/org/apache/paimon/operation/TagDeletion.java | 6 +++++-
2 files changed, 12 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index def9052021..ac5a56d43a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -340,9 +340,11 @@ public abstract class FileDeletionBase<T extends Snapshot>
{
if (index >= 0) {
Snapshot previousTag = taggedSnapshots.get(index);
if (previousTag.id() != cachedTag) {
- cachedTag = previousTag.id();
+ cachedTag = 0;
cachedTagDataFiles.clear();
addMergedDataFiles(cachedTagDataFiles, previousTag);
+ // update cachedTag after read tag successfully
+ cachedTag = previousTag.id();
}
return entry -> containsDataFile(cachedTagDataFiles, entry);
}
@@ -359,7 +361,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
try {
return manifestList.read(manifestListName);
} catch (Exception e) {
- LOG.warn("Failed to read manifest list file " + manifestListName,
e);
+ LOG.warn("Failed to read manifest list file {}", manifestListName,
e);
return Collections.emptyList();
}
}
@@ -371,7 +373,8 @@ public abstract class FileDeletionBase<T extends Snapshot> {
protected void addMergedDataFiles(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot
snapshot)
throws IOException {
- for (ExpireFileEntry entry : readMergedDataFiles(snapshot)) {
+ for (ExpireFileEntry entry :
+ readMergedDataFiles(manifestList.readDataManifests(snapshot)))
{
dataFiles
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
.computeIfAbsent(entry.bucket(), b -> new HashSet<>())
@@ -379,14 +382,8 @@ public abstract class FileDeletionBase<T extends Snapshot>
{
}
}
- protected Collection<ExpireFileEntry> readMergedDataFiles(Snapshot
snapshot)
+ protected Collection<ExpireFileEntry>
readMergedDataFiles(List<ManifestFileMeta> manifests)
throws IOException {
- // read data manifests
-
- List<ManifestFileMeta> manifests =
tryReadManifestList(snapshot.baseManifestList());
- manifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
-
- // read and merge manifest entries
Map<Identifier, ExpireFileEntry> map = new HashMap<>();
for (ManifestFileMeta manifest : manifests) {
List<ExpireFileEntry> entries =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index bac941de6f..8bf9345643 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -26,6 +26,7 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.DataFilePathFactories;
@@ -73,7 +74,10 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
public void cleanUnusedDataFiles(Snapshot taggedSnapshot,
Predicate<ExpireFileEntry> skipper) {
Collection<ExpireFileEntry> manifestEntries;
try {
- manifestEntries = readMergedDataFiles(taggedSnapshot);
+ List<ManifestFileMeta> manifests =
+ tryReadManifestList(taggedSnapshot.baseManifestList());
+
manifests.addAll(tryReadManifestList(taggedSnapshot.deltaManifestList()));
+ manifestEntries = readMergedDataFiles(manifests);
} catch (IOException e) {
LOG.info("Skip data file clean for the tag of id {}.",
taggedSnapshot.id(), e);
return;