This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 478f1670a [core] Support the rollback and orphan file clean with
changelog decouple (#3144)
478f1670a is described below
commit 478f1670a829590880676cb5d76747af5514e0a1
Author: Aitozi <[email protected]>
AuthorDate: Sun Apr 7 10:40:14 2024 +0800
[core] Support the rollback and orphan file clean with changelog decouple
(#3144)
---
.../apache/paimon/operation/OrphanFilesClean.java | 66 ++++-
.../org/apache/paimon/table/RollbackHelper.java | 17 +-
.../org/apache/paimon/utils/SnapshotManager.java | 49 +++-
.../paimon/operation/OrphanFilesCleanTest.java | 296 +++++++++++++++++----
.../paimon/table/FileStoreTableTestBase.java | 7 +
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 80 ++++++
6 files changed, 455 insertions(+), 60 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 13ae240b6..b68cf8cc7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -18,8 +18,10 @@
package org.apache.paimon.operation;
+import org.apache.paimon.Changelog;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
@@ -91,6 +93,7 @@ public class OrphanFilesClean {
// an estimated value of how many files were deleted
private int deletedFilesNum = 0;
+ private final List<Path> deleteFiles;
private long olderThanMillis = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(1);
public OrphanFilesClean(FileStoreTable table) {
@@ -104,6 +107,7 @@ public class OrphanFilesClean {
this.manifestList = store.manifestListFactory().create();
this.manifestFile = store.manifestFileFactory().create();
this.indexFileHandler = store.newIndexFileHandler();
+ this.deleteFiles = new ArrayList<>();
}
public OrphanFilesClean olderThan(String timestamp) {
@@ -124,6 +128,13 @@ public class OrphanFilesClean {
List<Path> nonSnapshotFiles =
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
nonSnapshotFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonSnapshotFiles.size();
+ deleteFiles.addAll(nonSnapshotFiles);
+
+ // specially handle the changelog directory
+ List<Path> nonChangelogFiles =
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+ nonChangelogFiles.forEach(this::deleteFileOrDirQuietly);
+ deletedFilesNum += nonChangelogFiles.size();
+ deleteFiles.addAll(nonChangelogFiles);
Map<String, Path> candidates = getCandidateDeletingFiles();
Set<String> usedFiles = getUsedFiles();
@@ -136,10 +147,16 @@ public class OrphanFilesClean {
deleteFileOrDirQuietly(path);
}
deletedFilesNum += deleted.size();
+
deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));
return deletedFilesNum;
}
+ @VisibleForTesting
+ List<Path> getDeleteFiles() {
+ return deleteFiles;
+ }
+
/** Get all the files used by snapshots and tags. */
private Set<String> getUsedFiles()
throws IOException, ExecutionException, InterruptedException {
@@ -147,6 +164,7 @@ public class OrphanFilesClean {
Set<Snapshot> readSnapshots = new
HashSet<>(snapshotManager.safelyGetAllSnapshots());
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
readSnapshots.addAll(taggedSnapshots);
+ readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
return FileUtils.COMMON_IO_FORK_JOIN_POOL
.submit(
@@ -154,8 +172,16 @@ public class OrphanFilesClean {
readSnapshots
.parallelStream()
.flatMap(
- snapshot ->
-
getUsedFilesForSnapshot(snapshot).stream())
+ snapshot -> {
+ if (snapshot instanceof
Changelog) {
+ return
getUsedFilesForChangelog(
+ (Changelog)
snapshot)
+ .stream();
+ } else {
+ return
getUsedFilesForSnapshot(snapshot)
+ .stream();
+ }
+ })
.collect(Collectors.toSet()))
.get();
}
@@ -184,6 +210,41 @@ public class OrphanFilesClean {
}
}
+ private List<String> getUsedFilesForChangelog(Changelog changelog) {
+ List<String> files = new ArrayList<>();
+ if (changelog.changelogManifestList() != null) {
+ files.add(changelog.changelogManifestList());
+ }
+
+ try {
+ // try to read manifests
+ List<ManifestFileMeta> manifestFileMetas =
+ retryReadingFiles(
+ () ->
+ manifestList.readWithIOException(
+
changelog.changelogManifestList()));
+ if (manifestFileMetas == null) {
+ return Collections.emptyList();
+ }
+ List<String> manifestFileName =
+ manifestFileMetas.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toList());
+ files.addAll(manifestFileName);
+
+ // try to read data files
+ List<String> dataFiles = retryReadingDataFiles(manifestFileName);
+ if (dataFiles == null) {
+ return Collections.emptyList();
+ }
+ files.addAll(dataFiles);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return files;
+ }
+
/**
* If getting null when reading some files, the snapshot/tag is being
deleted, so just return an
* empty result.
@@ -415,6 +476,7 @@ public class OrphanFilesClean {
.forEach(
p -> {
deleteFileOrDirQuietly(p);
+ deleteFiles.add(p);
deletedFilesNum++;
});
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 3a255ad09..90801caf0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -138,7 +138,7 @@ public class RollbackHelper {
return Collections.emptyList();
}
- // delete snapshot files first, cannot be read now
+ // delete changelog files first, cannot be read now
// it is possible that some snapshots have been expired
List<Changelog> toBeCleaned = new ArrayList<>();
long to = Math.max(earliest, retainedSnapshot.id() + 1);
@@ -157,6 +157,21 @@ public class RollbackHelper {
// delete directories
snapshotDeletion.cleanDataDirectories();
+ // modify the latest hint
+ try {
+ if (toBeCleaned.size() > 0) {
+ if (to == earliest) {
+ // all changelog has been cleaned, so we do not know the
actual latest id
+ // set to -1
+ snapshotManager.commitLongLivedChangelogLatestHint(-1);
+ } else {
+ snapshotManager.commitLongLivedChangelogLatestHint(to - 1);
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
return toBeCleaned;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 834836dcb..7c5ccd28c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -318,6 +318,13 @@ public class SnapshotManager implements Serializable {
.iterator();
}
+ public Iterator<Changelog> changelogs() throws IOException {
+ return listVersionedFiles(fileIO, changelogDirectory(),
CHANGELOG_PREFIX)
+ .map(this::changelog)
+ .sorted(Comparator.comparingLong(Changelog::id))
+ .iterator();
+ }
+
/**
* If {@link FileNotFoundException} is thrown when reading the snapshot
file, this snapshot may
* be deleted by other processes, so just skip this snapshot.
@@ -336,13 +343,40 @@ public class SnapshotManager implements Serializable {
return snapshots;
}
+ public List<Changelog> safelyGetAllChangelogs() throws IOException {
+ List<Path> paths =
+ listVersionedFiles(fileIO, changelogDirectory(),
CHANGELOG_PREFIX)
+ .map(this::longLivedChangelogPath)
+ .collect(Collectors.toList());
+
+ List<Changelog> changelogs = new ArrayList<>();
+ for (Path path : paths) {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ changelogs.add(Changelog.fromJson(json));
+ } catch (FileNotFoundException ignored) {
+ }
+ }
+
+ return changelogs;
+ }
+
/**
* Try to get non snapshot files. If any error occurred, just ignore it
and return an empty
* result.
*/
public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus>
fileStatusFilter) {
+ return listPathWithFilter(snapshotDirectory(), fileStatusFilter,
nonSnapshotFileFilter());
+ }
+
+ public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus>
fileStatusFilter) {
+ return listPathWithFilter(changelogDirectory(), fileStatusFilter,
nonChangelogFileFilter());
+ }
+
+ private List<Path> listPathWithFilter(
+ Path directory, Predicate<FileStatus> fileStatusFilter,
Predicate<Path> fileFilter) {
try {
- FileStatus[] statuses = fileIO.listStatus(snapshotDirectory());
+ FileStatus[] statuses = fileIO.listStatus(directory);
if (statuses == null) {
return Collections.emptyList();
}
@@ -350,7 +384,7 @@ public class SnapshotManager implements Serializable {
return Arrays.stream(statuses)
.filter(fileStatusFilter)
.map(FileStatus::getPath)
- .filter(nonSnapshotFileFilter())
+ .filter(fileFilter)
.collect(Collectors.toList());
} catch (IOException ignored) {
return Collections.emptyList();
@@ -366,6 +400,15 @@ public class SnapshotManager implements Serializable {
};
}
+ private Predicate<Path> nonChangelogFileFilter() {
+ return path -> {
+ String name = path.getName();
+ return !name.startsWith(CHANGELOG_PREFIX)
+ && !name.equals(EARLIEST)
+ && !name.equals(LATEST);
+ };
+ }
+
public Optional<Snapshot> latestSnapshotOfUser(String user) {
Long latestId = latestSnapshotId();
if (latestId == null) {
@@ -472,7 +515,7 @@ public class SnapshotManager implements Serializable {
}
Long snapshotId = readHint(LATEST, dir);
- if (snapshotId != null) {
+ if (snapshotId != null && snapshotId > 0) {
long nextSnapshot = snapshotId + 1;
// it is the latest only there is no next one
if (!fileIO.exists(file.apply(nextSnapshot))) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index c20cdf21d..c5bd22dc5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.Changelog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -48,9 +50,11 @@ import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -67,6 +71,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -104,7 +109,7 @@ public class OrphanFilesCleanTest {
DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.STRING()
},
new String[] {"pk", "part1", "part2", "value"});
- table = createFileStoreTable(rowType);
+ table = createFileStoreTable(rowType, new Options());
String commitUser = UUID.randomUUID().toString();
write = table.newWrite(commitUser);
commit = table.newCommit(commitUser);
@@ -126,36 +131,9 @@ public class OrphanFilesCleanTest {
int commitTimes = 30;
List<List<TestPojo>> committedData = new ArrayList<>();
Map<Long, List<TestPojo>> snapshotData = new HashMap<>();
- SnapshotManager snapshotManager = table.snapshotManager();
-
- // generate data of first snapshot
- List<TestPojo> data = generateData();
- commit(data);
- committedData.add(data);
- recordSnapshotData(data, snapshotData, snapshotManager);
-
- // randomly generate data
- for (int i = 1; i <= commitTimes; i++) {
- List<TestPojo> previous =
- new
ArrayList<>(snapshotData.get(snapshotManager.latestSnapshotId()));
- // randomly update
- if (RANDOM.nextBoolean()) {
- List<TestPojo> toBeUpdated = randomlyPick(previous);
- List<TestPojo> updateAfter = commitUpdate(toBeUpdated);
- committedData.add(updateAfter);
-
- previous.removeAll(toBeUpdated);
- previous.addAll(updateAfter);
- recordSnapshotData(previous, snapshotData, snapshotManager);
- } else {
- List<TestPojo> current = generateData();
- commit(current);
- committedData.add(current);
- current.addAll(previous);
- recordSnapshotData(current, snapshotData, snapshotManager);
- }
- }
+ SnapshotManager snapshotManager = table.snapshotManager();
+ writeData(snapshotManager, committedData, snapshotData, new
HashMap<>(), commitTimes);
// randomly create tags
List<String> allTags = new ArrayList<>();
@@ -169,24 +147,8 @@ public class OrphanFilesCleanTest {
}
// generate non used files
- int shouldBeDeleted = 0;
- int fileNum = RANDOM.nextInt(10);
- fileNum = fileNum == 0 ? 1 : fileNum;
-
- // snapshot
- addNonUsedFiles(
- new Path(tablePath, "snapshot"), fileNum,
Collections.singletonList("UNKNOWN"));
- shouldBeDeleted += fileNum;
-
- // data files
- shouldBeDeleted += randomlyAddNonUsedDataFiles();
-
- // manifests
- addNonUsedFiles(
- manifestDir,
- fileNum,
- Arrays.asList("manifest-list-", "manifest-",
"index-manifest-", "UNKNOWN-"));
- shouldBeDeleted += fileNum;
+ int shouldBeDeleted = generateUnUsedFile();
+ assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
// randomly expire snapshots
int expired = RANDOM.nextInt(snapshotCount / 2);
@@ -213,9 +175,9 @@ public class OrphanFilesCleanTest {
// second check
orphanFilesClean = new OrphanFilesClean(table);
setOlderThan(orphanFilesClean);
- int deletedActual = orphanFilesClean.clean();
+ orphanFilesClean.clean();
try {
- validate(deletedActual, shouldBeDeleted, snapshotData);
+ validate(orphanFilesClean.getDeleteFiles(), snapshotData, new
HashMap<>());
} catch (Throwable t) {
String tableOptions = "Table options:\n" + table.options();
@@ -250,9 +212,20 @@ public class OrphanFilesCleanTest {
}
private void validate(
- int actualDeleted, int shouldBeDeleted, Map<Long, List<TestPojo>>
snapshotData)
+ List<Path> deleteFiles,
+ Map<Long, List<TestPojo>> snapshotData,
+ Map<Long, List<InternalRow>> changelogData)
throws Exception {
- assertThat(actualDeleted).isEqualTo(shouldBeDeleted);
+ assertThat(
+ deleteFiles.stream()
+ .map(p -> p.toUri().getPath())
+ .sorted()
+ .collect(Collectors.joining("\n")))
+ .isEqualTo(
+ manuallyAddedFiles.stream()
+ .map(p -> p.toUri().getPath())
+ .sorted()
+ .collect(Collectors.joining("\n")));
Set<Snapshot> snapshots = new HashSet<>();
table.snapshotManager().snapshots().forEachRemaining(snapshots::add);
@@ -269,6 +242,72 @@ public class OrphanFilesCleanTest {
throw new Exception("Failed to validate snapshot " +
snapshot.id(), e);
}
}
+ // validate changelog
+ if (table.coreOptions().changelogProducer() ==
CoreOptions.ChangelogProducer.INPUT) {
+ List<Changelog> changelogs = new ArrayList<>();
+
table.snapshotManager().changelogs().forEachRemaining(changelogs::add);
+ validateChangelog(
+ changelogs.stream()
+ .sorted(Comparator.comparingLong(Changelog::id))
+ .collect(Collectors.toList()),
+ changelogData);
+ }
+ }
+
+ private void validateChangelog(
+ List<Changelog> changelogs, Map<Long, List<InternalRow>>
changelogData)
+ throws Exception {
+ Preconditions.checkArgument(!changelogs.isEmpty(), "The changelogs
should not be empty!");
+ FileStoreTable scanTable =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ String.valueOf(changelogs.get(0).id())));
+ Long max =
+ changelogData.keySet().stream()
+ .max(Comparator.comparingLong(Long::longValue))
+ .get();
+ InnerStreamTableScan scan = scanTable.newStreamScan();
+ TreeMap<Long, List<InternalRow>> data = new TreeMap<>(changelogData);
+ // clear the data < the smallest changelog data.
+ data.headMap(changelogs.get(0).id()).clear();
+
+ // initial plan
+ scan.plan();
+ Long id = changelogs.get(0).id();
+ while (id <= max) {
+ List<Split> splits = scan.plan().splits();
+ if (!splits.isEmpty()) {
+ List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers =
new ArrayList<>();
+ for (Split split : splits) {
+ readers.add(() -> scanTable.newRead().createReader(split));
+ }
+ RecordReader<InternalRow> recordReader =
ConcatRecordReader.create(readers);
+ RecordReaderIterator<InternalRow> iterator =
+ new RecordReaderIterator<>(recordReader);
+ List<String> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ InternalRow rowData = iterator.next();
+ result.add(DataFormatTestUtil.internalRowToString(rowData,
rowType));
+ }
+ iterator.close();
+ id = scan.checkpoint();
+
+ List<InternalRow> batch = data.remove(id - 1);
+
assertThat(result.stream().sorted().collect(Collectors.joining("\n")))
+ .isEqualTo(
+ batch.stream()
+ .map(
+ d ->
+
DataFormatTestUtil.internalRowToString(
+ d, rowType))
+ .sorted()
+ .collect(Collectors.joining("\n")));
+ } else {
+ id = scan.checkpoint();
+ }
+ }
+
Assertions.assertThat(data.values().stream().allMatch(List::isEmpty)).isTrue();
}
private void validateSnapshot(Snapshot snapshot, List<TestPojo> data)
throws Exception {
@@ -289,6 +328,45 @@ public class OrphanFilesCleanTest {
assertThat(result).containsExactlyInAnyOrderElementsOf(TestPojo.formatData(data));
}
+ @Test
+ public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception {
+ // recreate the table with another option
+ this.write.close();
+ this.commit.close();
+ int commitTimes = 30;
+ Options options = new Options();
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15);
+ options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
+ FileStoreTable table = createFileStoreTable(rowType, options);
+ String commitUser = UUID.randomUUID().toString();
+ this.table = table;
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+
+ List<List<TestPojo>> committedData = new ArrayList<>();
+ Map<Long, List<TestPojo>> snapshotData = new HashMap<>();
+ Map<Long, List<InternalRow>> changelogData = new HashMap<>();
+
+ SnapshotManager snapshotManager = table.snapshotManager();
+ writeData(snapshotManager, committedData, snapshotData, changelogData,
commitTimes);
+
+ // generate non used files
+ int shouldBeDeleted = generateUnUsedFile();
+ assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
+
+ // first check, nothing will be deleted because the default olderThan
interval is 1 day
+ OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table);
+ assertThat(orphanFilesClean.clean()).isEqualTo(0);
+
+ // second check
+ orphanFilesClean = new OrphanFilesClean(table);
+ setOlderThan(orphanFilesClean);
+ orphanFilesClean.clean();
+ List<Path> cleanFiles = orphanFilesClean.getDeleteFiles();
+ validate(cleanFiles, snapshotData, changelogData);
+ }
+
/** Manually make a FileNotFoundException to simulate snapshot expire
while clean. */
@Test
public void testAbnormallyRemoving() throws Exception {
@@ -317,6 +395,75 @@ public class OrphanFilesCleanTest {
assertThat(orphanFilesClean.clean()).isGreaterThan(0);
}
+ private void writeData(
+ SnapshotManager snapshotManager,
+ List<List<TestPojo>> committedData,
+ Map<Long, List<TestPojo>> snapshotData,
+ Map<Long, List<InternalRow>> changelogData,
+ int commitTimes)
+ throws Exception {
+ // first snapshot
+ List<TestPojo> data = generateData();
+ commit(data);
+ committedData.add(data);
+ recordSnapshotData(data, snapshotData, snapshotManager);
+ recordChangelogData(new ArrayList<>(), data, changelogData,
snapshotManager);
+
+ // randomly generate data
+ for (int i = 1; i <= commitTimes; i++) {
+ List<TestPojo> previous =
+ new
ArrayList<>(snapshotData.get(snapshotManager.latestSnapshotId()));
+ // randomly update
+ if (RANDOM.nextBoolean()) {
+ List<TestPojo> toBeUpdated = randomlyPick(previous);
+ List<TestPojo> updateAfter = commitUpdate(toBeUpdated);
+ committedData.add(updateAfter);
+
+ previous.removeAll(toBeUpdated);
+ previous.addAll(updateAfter);
+ recordSnapshotData(previous, snapshotData, snapshotManager);
+ recordChangelogData(toBeUpdated, updateAfter, changelogData,
snapshotManager);
+ } else {
+ List<TestPojo> current = generateData();
+ commit(current);
+ committedData.add(current);
+
+ recordChangelogData(new ArrayList<>(), current, changelogData,
snapshotManager);
+ current.addAll(previous);
+ recordSnapshotData(current, snapshotData, snapshotManager);
+ }
+ }
+ }
+
+ private int generateUnUsedFile() throws Exception {
+ int shouldBeDeleted = 0;
+ int fileNum = RANDOM.nextInt(10);
+ fileNum = fileNum == 0 ? 1 : fileNum;
+
+ // snapshot
+ addNonUsedFiles(
+ new Path(tablePath, "snapshot"), fileNum,
Collections.singletonList("UNKNOWN"));
+
+ shouldBeDeleted += fileNum;
+
+ // changelog
+ addNonUsedFiles(
+ new Path(tablePath, "changelog"), fileNum,
Collections.singletonList("UNKNOWN"));
+
+ shouldBeDeleted += fileNum;
+
+ // data files
+ shouldBeDeleted += randomlyAddNonUsedDataFiles();
+
+ // manifests
+ addNonUsedFiles(
+ manifestDir,
+ fileNum,
+ Arrays.asList("manifest-list-", "manifest-",
"index-manifest-", "UNKNOWN-"));
+ shouldBeDeleted += fileNum;
+ return shouldBeDeleted;
+ }
+
private void setOlderThan(OrphanFilesClean orphanFilesClean) {
String timestamp =
DateTimeUtils.formatLocalDateTime(
@@ -367,6 +514,32 @@ public class OrphanFilesCleanTest {
snapshotData.put(latest.id(), data);
}
+ private void recordChangelogData(
+ List<TestPojo> updateBefore,
+ List<TestPojo> updateAfter,
+ Map<Long, List<InternalRow>> changelogData,
+ SnapshotManager snapshotManager) {
+ Snapshot latest = snapshotManager.latestSnapshot();
+ boolean isInsert = updateBefore.isEmpty();
+ if (table.coreOptions().changelogProducer() ==
CoreOptions.ChangelogProducer.INPUT) {
+ List<InternalRow> data = new ArrayList<>();
+ for (TestPojo testPojo : updateBefore) {
+ data.add(testPojo.toRow(RowKind.UPDATE_BEFORE));
+ }
+ for (TestPojo testPojo : updateAfter) {
+ data.add(testPojo.toRow(isInsert ? RowKind.INSERT :
RowKind.UPDATE_AFTER));
+ }
+ if (latest.commitKind() != Snapshot.CommitKind.COMPACT) {
+ changelogData.put(latest.id(), data);
+ } else {
+ changelogData.put(latest.id() - 1, data);
+ changelogData.put(latest.id(), new ArrayList<>());
+ }
+ } else {
+ changelogData.put(latest.id(), new ArrayList<>());
+ }
+ }
+
private int randomlyAddNonUsedDataFiles() throws IOException {
int addedFiles = 0;
List<Path> part1 = listSubDirs(tablePath, p ->
p.getName().contains("="));
@@ -467,6 +640,22 @@ public class OrphanFilesCleanTest {
return new TestPojo(pk, part1, part2, randomValue());
}
+ @Override
+ public String toString() {
+ return "TestPojo{"
+ + "pk="
+ + pk
+ + ", part1="
+ + part1
+ + ", part2='"
+ + part2
+ + '\''
+ + ", value='"
+ + value
+ + '\''
+ + '}';
+ }
+
public static void reset() {
increaseKey = 0;
}
@@ -493,8 +682,7 @@ public class OrphanFilesCleanTest {
return StringUtils.getRandomString(RANDOM, 5, 20, 'a', 'z');
}
- private FileStoreTable createFileStoreTable(RowType rowType) throws
Exception {
- Options conf = new Options();
+ private FileStoreTable createFileStoreTable(RowType rowType, Options conf)
throws Exception {
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.BUCKET, RANDOM.nextInt(3) + 1);
TableSchema tableSchema =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 6123a9e99..138a30d5b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -845,6 +845,7 @@ public abstract class FileStoreTableTestBase {
Options options = new Options();
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
+ options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
table.copy(options.toMap()).newCommit("").expireSnapshots();
@@ -881,6 +882,12 @@ public abstract class FileStoreTableTestBase {
private FileStoreTable prepareRollbackTable(int commitTimes) throws
Exception {
FileStoreTable table = createFileStoreTable();
+ prepareRollbackTable(commitTimes, table);
+ return table;
+ }
+
+ protected FileStoreTable prepareRollbackTable(int commitTimes,
FileStoreTable table)
+ throws Exception {
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index db6e12305..a1fc6f0b0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -66,6 +66,8 @@ import org.apache.paimon.utils.CompatibilityTestUtils;
import org.junit.jupiter.api.Test;
+import java.io.File;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -73,14 +75,19 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
+import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
@@ -1371,6 +1378,79 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testRollbackToTagWithChangelogDecoupled() throws Exception {
+ int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+ FileStoreTable table =
+ createFileStoreTable(
+ options ->
+ options.set(
+ CoreOptions.CHANGELOG_PRODUCER,
+ CoreOptions.ChangelogProducer.INPUT));
+ prepareRollbackTable(commitTimes, table);
+
+ int t1 = 1;
+ int t2 = commitTimes - 3;
+ int t3 = commitTimes - 1;
+ table.createTag("test1", t1);
+ table.createTag("test2", t2);
+ table.createTag("test3", t3);
+
+ // expire snapshots
+ Options options = new Options();
+ options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
+ // -------------changelog--------- |-------snapshot------|
+ // s(1)test1 -------- s(c - 3)test2 --- s(c - 1) test3
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
+ options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
+ options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
+ options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
+ options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
+ table.copy(options.toMap()).newCommit("").expireSnapshots();
+
+ table.rollbackTo("test3");
+ assertReadChangelog(t3, table);
+
+ table.rollbackTo("test2");
+ assertReadChangelog(t2, table);
+
+ table.rollbackTo("test1");
+
+ List<java.nio.file.Path> files =
+ Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ .collect(Collectors.toList());
+ assertThat(files.size()).isEqualTo(19);
+ // rollback snapshot case testRollbackToSnapshotCase0 plus 4:
+ // table-path/tag/tag-test1
+ // table-path/changelog
+ // table-path/changelog/LATEST
+ // table-path/changelog/EARLIEST
+ }
+
+ private void assertReadChangelog(int id, FileStoreTable table) throws
Exception {
+ // read the changelog at #{id}
+ table =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
String.valueOf(id)));
+ ReadBuilder readBuilder = table.newReadBuilder();
+ StreamTableScan scan = readBuilder.newStreamScan();
+
+ // skip the NextSnapshot
+ scan.plan();
+ List<String> results =
+ getResult(readBuilder.newRead(), scan.plan().splits(),
BATCH_ROW_TO_STRING);
+ if (id == 1) {
+ assertThat(results).isEmpty();
+ } else {
+ assertThat(results)
+ .containsExactlyInAnyOrder(
+ String.format(
+
"%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset",
+ id - 1, (id - 1) * 10, (id - 1) * 100));
+ }
+ }
+
private void innerTestTableQuery(FileStoreTable table) throws Exception {
IOManager ioManager = IOManager.create(tablePath.toString());
StreamTableWrite write =
table.newWrite(commitUser).withIOManager(ioManager);