This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9191e2e1f5 [core] Optimize memory usage for expiring snapshots and
tags (#4655)
9191e2e1f5 is described below
commit 9191e2e1f556bc91af7941046869a516c15d5fe8
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 9 13:55:11 2024 +0800
[core] Optimize memory usage for expiring snapshots and tags (#4655)
This closes #4655.
---
.../apache/paimon/manifest/ExpireFileEntry.java | 86 +++++++++++++++++++
.../java/org/apache/paimon/manifest/FileEntry.java | 2 +
.../org/apache/paimon/manifest/ManifestEntry.java | 5 ++
.../org/apache/paimon/manifest/ManifestFile.java | 10 +++
.../apache/paimon/manifest/SimpleFileEntry.java | 5 ++
.../apache/paimon/operation/ChangelogDeletion.java | 4 +-
.../apache/paimon/operation/FileDeletionBase.java | 99 ++++++++++------------
.../apache/paimon/operation/SnapshotDeletion.java | 12 +--
.../org/apache/paimon/operation/TagDeletion.java | 17 ++--
.../apache/paimon/table/ExpireChangelogImpl.java | 4 +-
.../apache/paimon/table/ExpireSnapshotsImpl.java | 4 +-
.../org/apache/paimon/table/RollbackHelper.java | 4 +-
.../java/org/apache/paimon/utils/TagManager.java | 4 +-
.../paimon/operation/ExpireSnapshotsTest.java | 5 +-
14 files changed, 183 insertions(+), 78 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
new file mode 100644
index 0000000000..060360623c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** A {@link SimpleFileEntry} with {@link #fileSource}. */
+public class ExpireFileEntry extends SimpleFileEntry {
+
+ @Nullable private final FileSource fileSource;
+
+ public ExpireFileEntry(
+ FileKind kind,
+ BinaryRow partition,
+ int bucket,
+ int level,
+ String fileName,
+ List<String> extraFiles,
+ @Nullable byte[] embeddedIndex,
+ BinaryRow minKey,
+ BinaryRow maxKey,
+ @Nullable FileSource fileSource) {
+ super(kind, partition, bucket, level, fileName, extraFiles,
embeddedIndex, minKey, maxKey);
+ this.fileSource = fileSource;
+ }
+
+ public Optional<FileSource> fileSource() {
+ return Optional.ofNullable(fileSource);
+ }
+
+ public static ExpireFileEntry from(ManifestEntry entry) {
+ return new ExpireFileEntry(
+ entry.kind(),
+ entry.partition(),
+ entry.bucket(),
+ entry.level(),
+ entry.fileName(),
+ entry.file().extraFiles(),
+ entry.file().embeddedIndex(),
+ entry.minKey(),
+ entry.maxKey(),
+ entry.file().fileSource().orElse(null));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ExpireFileEntry that = (ExpireFileEntry) o;
+ return fileSource == that.fileSource;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), fileSource);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 91e07a369d..a2569beac6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -60,6 +60,8 @@ public interface FileEntry {
BinaryRow maxKey();
+ List<String> extraFiles();
+
/**
* The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
* file.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index ee5dc2c344..626e0a5d46 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -102,6 +102,11 @@ public class ManifestEntry implements FileEntry {
return file.maxKey();
}
+ @Override
+ public List<String> extraFiles() {
+ return file.extraFiles();
+ }
+
public int totalBuckets() {
return totalBuckets;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 128f5262a5..1aba2ef195 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -39,6 +39,7 @@ import org.apache.paimon.utils.VersionedObjectSerializer;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -84,6 +85,15 @@ public class ManifestFile extends ObjectsFile<ManifestEntry>
{
return suggestedFileSize;
}
+ public List<ExpireFileEntry> readExpireFileEntries(String fileName,
@Nullable Long fileSize) {
+ List<ManifestEntry> entries = read(fileName, fileSize);
+ List<ExpireFileEntry> result = new ArrayList<>(entries.size());
+ for (ManifestEntry entry : entries) {
+ result.add(ExpireFileEntry.from(entry));
+ }
+ return result;
+ }
+
/**
* Write several {@link ManifestEntry}s into manifest files.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index 8d33ede0c4..fdaed2b85a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -117,6 +117,11 @@ public class SimpleFileEntry implements FileEntry {
return maxKey;
}
+ @Override
+ public List<String> extraFiles() {
+ return extraFiles;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
index c20405ff26..069e57bb3d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
@@ -23,8 +23,8 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
@@ -60,7 +60,7 @@ public class ChangelogDeletion extends
FileDeletionBase<Changelog> {
}
@Override
- public void cleanUnusedDataFiles(Changelog changelog,
Predicate<ManifestEntry> skipper) {
+ public void cleanUnusedDataFiles(Changelog changelog,
Predicate<ExpireFileEntry> skipper) {
if (changelog.changelogManifestList() != null) {
deleteAddedDataFiles(changelog.changelogManifestList());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 303a074b0c..cfecd767b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -24,10 +24,11 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
@@ -46,7 +47,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
/**
* Base class for file deletion including methods for clean data files,
manifest files and empty
@@ -110,7 +109,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
* @param skipper if the test result of a data file is true, it will be
skipped when deleting;
* else it will be deleted
*/
- public abstract void cleanUnusedDataFiles(T snapshot,
Predicate<ManifestEntry> skipper);
+ public abstract void cleanUnusedDataFiles(T snapshot,
Predicate<ExpireFileEntry> skipper);
/**
* Clean metadata files that will not be used anymore of a snapshot,
including data manifests,
@@ -164,21 +163,23 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
deletionBuckets.clear();
}
- protected void recordDeletionBuckets(ManifestEntry entry) {
+ protected void recordDeletionBuckets(ExpireFileEntry entry) {
deletionBuckets
.computeIfAbsent(entry.partition(), p -> new HashSet<>())
.add(entry.bucket());
}
- public void cleanUnusedDataFiles(String manifestList,
Predicate<ManifestEntry> skipper) {
+ public void cleanUnusedDataFiles(String manifestList,
Predicate<ExpireFileEntry> skipper) {
// try read manifests
- List<String> manifestFileNames =
readManifestFileNames(tryReadManifestList(manifestList));
- List<ManifestEntry> manifestEntries;
+ List<ManifestFileMeta> manifests = tryReadManifestList(manifestList);
+ List<ExpireFileEntry> manifestEntries;
// data file path -> (original manifest entry, extra file paths)
- Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
- for (String manifest : manifestFileNames) {
+ Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
+ for (ManifestFileMeta manifest : manifests) {
try {
- manifestEntries = manifestFile.read(manifest);
+ manifestEntries =
+ manifestFile.readExpireFileEntries(
+ manifest.fileName(), manifest.fileSize());
} catch (Exception e) {
// cancel deletion if any exception occurs
LOG.warn("Failed to read some manifest files. Cancel
deletion.", e);
@@ -192,12 +193,12 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
}
protected void doCleanUnusedDataFile(
- Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
- Predicate<ManifestEntry> skipper) {
+ Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
+ Predicate<ExpireFileEntry> skipper) {
List<Path> actualDataFileToDelete = new ArrayList<>();
dataFileToDelete.forEach(
(path, pair) -> {
- ManifestEntry entry = pair.getLeft();
+ ExpireFileEntry entry = pair.getLeft();
// check whether we should skip the data file
if (!skipper.test(entry)) {
// delete data files
@@ -211,20 +212,20 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
}
protected void getDataFileToDelete(
- Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
- List<ManifestEntry> dataFileEntries) {
+ Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete,
+ List<ExpireFileEntry> dataFileEntries) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
- for (ManifestEntry entry : dataFileEntries) {
+ for (ExpireFileEntry entry : dataFileEntries) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
- Path dataFilePath = new Path(bucketPath, entry.file().fileName());
+ Path dataFilePath = new Path(bucketPath, entry.fileName());
switch (entry.kind()) {
case ADD:
dataFileToDelete.remove(dataFilePath);
break;
case DELETE:
- List<Path> extraFiles = new
ArrayList<>(entry.file().extraFiles().size());
- for (String file : entry.file().extraFiles()) {
+ List<Path> extraFiles = new
ArrayList<>(entry.extraFiles().size());
+ for (String file : entry.extraFiles()) {
extraFiles.add(new Path(bucketPath, file));
}
dataFileToDelete.put(dataFilePath, Pair.of(entry,
extraFiles));
@@ -242,27 +243,28 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
* @param manifestListName name of manifest list
*/
public void deleteAddedDataFiles(String manifestListName) {
- List<String> manifestFileNames =
- readManifestFileNames(tryReadManifestList(manifestListName));
- for (String file : manifestFileNames) {
+ List<ManifestFileMeta> manifests =
tryReadManifestList(manifestListName);
+ for (ManifestFileMeta manifest : manifests) {
try {
- List<ManifestEntry> manifestEntries = manifestFile.read(file);
+ List<ExpireFileEntry> manifestEntries =
+ manifestFile.readExpireFileEntries(
+ manifest.fileName(), manifest.fileSize());
deleteAddedDataFiles(manifestEntries);
} catch (Exception e) {
// We want to delete the data file, so just ignore the
unavailable files
- LOG.info("Failed to read manifest " + file + ". Ignore it.",
e);
+ LOG.info("Failed to read manifest " + manifest.fileName() + ".
Ignore it.", e);
}
}
}
- private void deleteAddedDataFiles(List<ManifestEntry> manifestEntries) {
+ private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
List<Path> dataFileToDelete = new ArrayList<>();
- for (ManifestEntry entry : manifestEntries) {
+ for (ExpireFileEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
dataFileToDelete.add(
new Path(
pathFactory.bucketPath(entry.partition(),
entry.bucket()),
- entry.file().fileName()));
+ entry.fileName()));
recordDeletionBuckets(entry);
}
}
@@ -327,7 +329,7 @@ public abstract class FileDeletionBase<T extends Snapshot> {
cleanUnusedStatisticsManifests(snapshot, skippingSet);
}
- public Predicate<ManifestEntry> createDataFileSkipperForTags(
+ public Predicate<ExpireFileEntry> createDataFileSkipperForTags(
List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws
Exception {
int index = SnapshotManager.findPreviousSnapshot(taggedSnapshots,
expiringSnapshotId);
// refresh tag data files
@@ -358,18 +360,6 @@ public abstract class FileDeletionBase<T extends Snapshot>
{
}
}
- protected List<String> tryReadDataManifests(Snapshot snapshot) {
- List<ManifestFileMeta> manifestFileMetas =
tryReadManifestList(snapshot.baseManifestList());
-
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
- return readManifestFileNames(manifestFileMetas);
- }
-
- protected List<String> readManifestFileNames(List<ManifestFileMeta>
manifestFileMetas) {
- return manifestFileMetas.stream()
- .map(ManifestFileMeta::fileName)
- .collect(Collectors.toCollection(LinkedList::new));
- }
-
/**
* NOTE: This method is used for building data file skipping set. If
failed to read some
* manifests, it will throw exception which callers must handle.
@@ -377,23 +367,26 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
protected void addMergedDataFiles(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot
snapshot)
throws IOException {
- for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
+ for (ExpireFileEntry entry : readMergedDataFiles(snapshot)) {
dataFiles
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
.computeIfAbsent(entry.bucket(), b -> new HashSet<>())
- .add(entry.file().fileName());
+ .add(entry.fileName());
}
}
- protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot)
throws IOException {
+ protected Collection<ExpireFileEntry> readMergedDataFiles(Snapshot
snapshot)
+ throws IOException {
// read data manifests
- List<String> files = tryReadDataManifests(snapshot);
+
+ List<ManifestFileMeta> manifests =
tryReadManifestList(snapshot.baseManifestList());
+ manifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
// read and merge manifest entries
- Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
- for (String manifest : files) {
- List<ManifestEntry> entries;
- entries = manifestFile.readWithIOException(manifest);
+ Map<Identifier, ExpireFileEntry> map = new HashMap<>();
+ for (ManifestFileMeta manifest : manifests) {
+ List<ExpireFileEntry> entries =
+ manifestFile.readExpireFileEntries(manifest.fileName(),
manifest.fileSize());
FileEntry.mergeEntries(entries, map);
}
@@ -401,12 +394,12 @@ public abstract class FileDeletionBase<T extends
Snapshot> {
}
protected boolean containsDataFile(
- Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, ManifestEntry
testee) {
- Map<Integer, Set<String>> buckets = dataFiles.get(testee.partition());
+ Map<BinaryRow, Map<Integer, Set<String>>> dataFiles,
ExpireFileEntry entry) {
+ Map<Integer, Set<String>> buckets = dataFiles.get(entry.partition());
if (buckets != null) {
- Set<String> fileNames = buckets.get(testee.bucket());
+ Set<String> fileNames = buckets.get(entry.bucket());
if (fileNames != null) {
- return fileNames.contains(testee.file().fileName());
+ return fileNames.contains(entry.fileName());
}
}
return false;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index d86907ecea..7d55b64c8e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -23,8 +23,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
@@ -65,15 +65,15 @@ public class SnapshotDeletion extends
FileDeletionBase<Snapshot> {
}
@Override
- public void cleanUnusedDataFiles(Snapshot snapshot,
Predicate<ManifestEntry> skipper) {
+ public void cleanUnusedDataFiles(Snapshot snapshot,
Predicate<ExpireFileEntry> skipper) {
if (changelogDecoupled && !produceChangelog) {
// Skip clean the 'APPEND' data files.If we do not have the file
source information
// eg: the old version table file, we just skip clean this here,
let it done by
// ExpireChangelogImpl
- Predicate<ManifestEntry> enriched =
+ Predicate<ExpireFileEntry> enriched =
manifestEntry ->
skipper.test(manifestEntry)
- ||
(manifestEntry.file().fileSource().orElse(FileSource.APPEND)
+ ||
(manifestEntry.fileSource().orElse(FileSource.APPEND)
== FileSource.APPEND);
cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched);
} else {
@@ -92,8 +92,8 @@ public class SnapshotDeletion extends
FileDeletionBase<Snapshot> {
}
@VisibleForTesting
- void cleanUnusedDataFile(List<ManifestEntry> dataFileLog) {
- Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
+ void cleanUnusedDataFile(List<ExpireFileEntry> dataFileLog) {
+ Map<Path, Pair<ExpireFileEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
getDataFileToDelete(dataFileToDelete, dataFileLog);
doCleanUnusedDataFile(dataFileToDelete, f -> false);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index a6cd338d58..2722ed0c7e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
@@ -68,8 +68,8 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
}
@Override
- public void cleanUnusedDataFiles(Snapshot taggedSnapshot,
Predicate<ManifestEntry> skipper) {
- Collection<ManifestEntry> manifestEntries;
+ public void cleanUnusedDataFiles(Snapshot taggedSnapshot,
Predicate<ExpireFileEntry> skipper) {
+ Collection<ExpireFileEntry> manifestEntries;
try {
manifestEntries = readMergedDataFiles(taggedSnapshot);
} catch (IOException e) {
@@ -78,11 +78,11 @@ public class TagDeletion extends FileDeletionBase<Snapshot>
{
}
Set<Path> dataFileToDelete = new HashSet<>();
- for (ManifestEntry entry : manifestEntries) {
+ for (ExpireFileEntry entry : manifestEntries) {
if (!skipper.test(entry)) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
- dataFileToDelete.add(new Path(bucketPath,
entry.file().fileName()));
- for (String file : entry.file().extraFiles()) {
+ dataFileToDelete.add(new Path(bucketPath, entry.fileName()));
+ for (String file : entry.extraFiles()) {
dataFileToDelete.add(new Path(bucketPath, file));
}
@@ -98,11 +98,12 @@ public class TagDeletion extends FileDeletionBase<Snapshot>
{
cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
}
- public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot)
throws Exception {
+ public Predicate<ExpireFileEntry> dataFileSkipper(Snapshot fromSnapshot)
throws Exception {
return dataFileSkipper(Collections.singletonList(fromSnapshot));
}
- public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot>
fromSnapshots) throws Exception {
+ public Predicate<ExpireFileEntry> dataFileSkipper(List<Snapshot>
fromSnapshots)
+ throws Exception {
Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
for (Snapshot snapshot : fromSnapshots) {
addMergedDataFiles(skipped, snapshot);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index ce54975450..1ffa7485ae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.utils.Preconditions;
@@ -147,7 +147,7 @@ public class ExpireChangelogImpl implements ExpireSnapshots
{
LOG.debug("Ready to delete changelog files from changelog #" +
id);
}
Changelog changelog = snapshotManager.longLivedChangelog(id);
- Predicate<ManifestEntry> skipper;
+ Predicate<ExpireFileEntry> skipper;
try {
skipper =
changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 2c83b63c97..dc1c2d6bdb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -22,7 +22,7 @@ import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.utils.Preconditions;
@@ -176,7 +176,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
continue;
}
// expire merge tree files and collect changed buckets
- Predicate<ManifestEntry> skipper;
+ Predicate<ExpireFileEntry> skipper;
try {
skipper =
snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 1eb4ccf001..29fecec113 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
@@ -205,7 +205,7 @@ public class RollbackHelper {
}
// delete data files
- Predicate<ManifestEntry> dataFileSkipper = null;
+ Predicate<ExpireFileEntry> dataFileSkipper = null;
boolean success = true;
try {
dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 1e05a100d7..4019395d8d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -23,7 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.Tag;
@@ -255,7 +255,7 @@ public class TagManager {
skippedSnapshots.add(right);
// delete data files and empty directories
- Predicate<ManifestEntry> dataFileSkipper = null;
+ Predicate<ExpireFileEntry> dataFileSkipper = null;
boolean success = true;
try {
dataFileSkipper = tagDeletion.dataFileSkipper(skippedSnapshots);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 96dce3d784..9dc9834373 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
@@ -218,7 +219,9 @@ public class ExpireSnapshotsTest {
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
// expire
- expire.snapshotDeletion().cleanUnusedDataFile(Arrays.asList(add,
delete));
+ expire.snapshotDeletion()
+ .cleanUnusedDataFile(
+ Arrays.asList(ExpireFileEntry.from(add),
ExpireFileEntry.from(delete)));
// check
assertThat(fileIO.exists(myDataFile)).isFalse();