This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b87538522 [core] Fix that tag deletion might delete used data files
(#2409)
b87538522 is described below
commit b875385229cc08ba6d82b727ed8ca4652695c98f
Author: yuzelin <[email protected]>
AuthorDate: Wed Nov 29 12:23:30 2023 +0800
[core] Fix that tag deletion might delete used data files (#2409)
---
.../apache/paimon/operation/FileDeletionBase.java | 28 ++++++------
.../org/apache/paimon/operation/TagDeletion.java | 52 ++++++++++------------
.../java/org/apache/paimon/utils/ObjectsFile.java | 34 +++++++++-----
.../paimon/operation/FileStoreExpireTestBase.java | 9 +++-
.../operation/UncleanedFileStoreExpireTest.java | 41 +++++++++++++++++
5 files changed, 108 insertions(+), 56 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index a26bf7b5a..e2c094ea0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -229,28 +229,28 @@ public abstract class FileDeletionBase {
*/
protected void addMergedDataFiles(
Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot
snapshot)
- throws Exception {
+ throws IOException {
+ for (ManifestEntry entry : readMergedDataFiles(snapshot)) {
+ dataFiles
+ .computeIfAbsent(entry.partition(), p -> new HashMap<>())
+ .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
+ .add(entry.file().fileName());
+ }
+ }
+
+ protected Collection<ManifestEntry> readMergedDataFiles(Snapshot snapshot)
throws IOException {
// read data manifests
List<String> files = tryReadDataManifests(snapshot);
- // try merging
+ // read and merge manifest entries
Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
- for (String file : files) {
+ for (String manifest : files) {
List<ManifestEntry> entries;
- try {
- entries = manifestFile.read(file);
- } catch (Exception e) {
- throw new Exception("Failed to read manifest file " + file, e);
- }
+ entries = manifestFile.readWithIOException(manifest);
ManifestEntry.mergeEntries(entries, map);
}
- for (ManifestEntry entry : map.values()) {
- dataFiles
- .computeIfAbsent(entry.partition(), p -> new HashMap<>())
- .computeIfAbsent(entry.bucket(), b -> new HashSet<>())
- .add(entry.file().fileName());
- }
+ return map.values();
}
protected boolean containsDataFile(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 09531e75c..96b0a3ee6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -31,6 +31,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -55,43 +57,35 @@ public class TagDeletion extends FileDeletionBase {
@Override
public void cleanUnusedDataFiles(Snapshot taggedSnapshot,
Predicate<ManifestEntry> skipper) {
- cleanUnusedDataFiles(tryReadDataManifests(taggedSnapshot), skipper);
- }
-
- @Override
- public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String>
skippingSet) {
- // doesn't clean changelog files because they are handled by
SnapshotDeletion
- cleanUnusedManifests(taggedSnapshot, skippingSet, false);
- }
+ Collection<ManifestEntry> manifestEntries;
+ try {
+ manifestEntries = readMergedDataFiles(taggedSnapshot);
+ } catch (IOException e) {
+ LOG.info("Skip data file clean for the tag of id {}.",
taggedSnapshot.id(), e);
+ return;
+ }
- private void cleanUnusedDataFiles(
- List<String> manifestFileNames, Predicate<ManifestEntry> skipper) {
Set<Path> dataFileToDelete = new HashSet<>();
- for (String manifest : manifestFileNames) {
- List<ManifestEntry> manifestEntries;
- try {
- manifestEntries = manifestFile.read(manifest);
- } catch (Exception e) {
- // We want to delete the data file, so just ignore the
unavailable files
- LOG.info("Failed to read manifest " + manifest + ". Ignore
it.", e);
- continue;
- }
-
- for (ManifestEntry entry : manifestEntries) {
- if (!skipper.test(entry)) {
- Path bucketPath =
pathFactory.bucketPath(entry.partition(), entry.bucket());
- dataFileToDelete.add(new Path(bucketPath,
entry.file().fileName()));
- for (String file : entry.file().extraFiles()) {
- dataFileToDelete.add(new Path(bucketPath, file));
- }
-
- recordDeletionBuckets(entry);
+ for (ManifestEntry entry : manifestEntries) {
+ if (!skipper.test(entry)) {
+ Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
+ dataFileToDelete.add(new Path(bucketPath,
entry.file().fileName()));
+ for (String file : entry.file().extraFiles()) {
+ dataFileToDelete.add(new Path(bucketPath, file));
}
+
+ recordDeletionBuckets(entry);
}
}
deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
}
+ @Override
+ public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String>
skippingSet) {
+ // doesn't clean changelog files because they are handled by
SnapshotDeletion
+ cleanUnusedManifests(taggedSnapshot, skippingSet, false);
+ }
+
public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot)
throws Exception {
return dataFileSkipper(Collections.singletonList(fromSnapshot));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 6ccd9111b..51b504926 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -78,6 +78,10 @@ public abstract class ObjectsFile<T> {
return read(fileName, Filter.alwaysTrue(), Filter.alwaysTrue());
}
+ public List<T> readWithIOException(String fileName) throws IOException {
+ return readWithIOException(fileName, Filter.alwaysTrue(),
Filter.alwaysTrue());
+ }
+
public boolean exists(String fileName) {
try {
return fileIO.exists(pathFactory.toPath(fileName));
@@ -89,23 +93,29 @@ public abstract class ObjectsFile<T> {
public List<T> read(
String fileName, Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter) {
try {
- if (cache != null) {
- return cache.read(fileName, loadFilter, readFilter);
- }
-
- RecordReader<InternalRow> reader =
- createFormatReader(fileIO, readerFactory,
pathFactory.toPath(fileName));
- if (readFilter != Filter.ALWAYS_TRUE) {
- reader = reader.filter(readFilter);
- }
- List<T> result = new ArrayList<>();
- reader.forEachRemaining(row ->
result.add(serializer.fromRow(row)));
- return result;
+ return readWithIOException(fileName, loadFilter, readFilter);
} catch (IOException e) {
throw new RuntimeException("Failed to read manifest list " +
fileName, e);
}
}
+ public List<T> readWithIOException(
+ String fileName, Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
+ throws IOException {
+ if (cache != null) {
+ return cache.read(fileName, loadFilter, readFilter);
+ }
+
+ RecordReader<InternalRow> reader =
+ createFormatReader(fileIO, readerFactory,
pathFactory.toPath(fileName));
+ if (readFilter != Filter.ALWAYS_TRUE) {
+ reader = reader.filter(readFilter);
+ }
+ List<T> result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.fromRow(row)));
+ return result;
+ }
+
public String writeWithoutRolling(Collection<T> records) {
return writeWithoutRolling(records.iterator());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
index b5c3f7c34..ad1ef4c33 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
@@ -110,11 +110,18 @@ public class FileStoreExpireTestBase {
protected void assertSnapshot(
int snapshotId, List<KeyValue> allData, List<Integer>
snapshotPositions)
throws Exception {
+ assertSnapshot(snapshotManager.snapshot(snapshotId), allData,
snapshotPositions);
+ }
+
+ protected void assertSnapshot(
+ Snapshot snapshot, List<KeyValue> allData, List<Integer>
snapshotPositions)
+ throws Exception {
+ int snapshotId = (int) snapshot.id();
Map<BinaryRow, BinaryRow> expected =
store.toKvMap(allData.subList(0,
snapshotPositions.get(snapshotId - 1)));
List<KeyValue> actualKvs =
store.readKvsFromManifestEntries(
-
store.newScan().withSnapshot(snapshotId).plan().files(), false);
+ store.newScan().withSnapshot(snapshot).plan().files(),
false);
gen.sort(actualKvs);
Map<BinaryRow, BinaryRow> actual = store.toKvMap(actualKvs);
assertThat(actual).isEqualTo(expected);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ef542e3f1..ecaf5ae8e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -19,7 +19,9 @@
package org.apache.paimon.operation;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.Test;
@@ -27,6 +29,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@@ -80,4 +83,42 @@ public class UncleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue();
assertSnapshot(latestSnapshotId, allData, snapshotPositions);
}
+
+ @Test
+ public void testMixedSnapshotAndTagDeletion() throws Exception {
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ commit(random.nextInt(10) + 30, allData, snapshotPositions);
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
+ TagManager tagManager = store.newTagManager();
+
+ // create tags for each snapshot
+ for (int id = 1; id <= latestSnapshotId; id++) {
+ Snapshot snapshot = snapshotManager.snapshot(id);
+ tagManager.createTag(snapshot, "tag" + id);
+ }
+
+ // randomly expire snapshots
+ int expired = random.nextInt(latestSnapshotId / 2) + 1;
+ int retained = latestSnapshotId - expired;
+ store.newExpire(retained, retained, Long.MAX_VALUE).expire();
+
+ // randomly delete tags
+ for (int id = 1; id <= latestSnapshotId; id++) {
+ if (random.nextBoolean()) {
+ tagManager.deleteTag("tag" + id, store.newTagDeletion(),
snapshotManager);
+ }
+ }
+
+ // check snapshots and tags
+ Set<Snapshot> allSnapshots = new HashSet<>();
+ snapshotManager.snapshots().forEachRemaining(allSnapshots::add);
+ allSnapshots.addAll(tagManager.taggedSnapshots());
+
+ for (Snapshot snapshot : allSnapshots) {
+ assertSnapshot(snapshot, allData, snapshotPositions);
+ }
+ }
}