This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 478f1670a [core] Support the rollback and orphan file clean with 
changelog decouple (#3144)
478f1670a is described below

commit 478f1670a829590880676cb5d76747af5514e0a1
Author: Aitozi <[email protected]>
AuthorDate: Sun Apr 7 10:40:14 2024 +0800

    [core] Support the rollback and orphan file clean with changelog decouple 
(#3144)
---
 .../apache/paimon/operation/OrphanFilesClean.java  |  66 ++++-
 .../org/apache/paimon/table/RollbackHelper.java    |  17 +-
 .../org/apache/paimon/utils/SnapshotManager.java   |  49 +++-
 .../paimon/operation/OrphanFilesCleanTest.java     | 296 +++++++++++++++++----
 .../paimon/table/FileStoreTableTestBase.java       |   7 +
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  80 ++++++
 6 files changed, 455 insertions(+), 60 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 13ae240b6..b68cf8cc7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.Changelog;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -91,6 +93,7 @@ public class OrphanFilesClean {
 
     // an estimated value of how many files were deleted
     private int deletedFilesNum = 0;
+    private final List<Path> deleteFiles;
     private long olderThanMillis = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1);
 
     public OrphanFilesClean(FileStoreTable table) {
@@ -104,6 +107,7 @@ public class OrphanFilesClean {
         this.manifestList = store.manifestListFactory().create();
         this.manifestFile = store.manifestFileFactory().create();
         this.indexFileHandler = store.newIndexFileHandler();
+        this.deleteFiles = new ArrayList<>();
     }
 
     public OrphanFilesClean olderThan(String timestamp) {
@@ -124,6 +128,13 @@ public class OrphanFilesClean {
         List<Path> nonSnapshotFiles = 
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
         nonSnapshotFiles.forEach(this::deleteFileOrDirQuietly);
         deletedFilesNum += nonSnapshotFiles.size();
+        deleteFiles.addAll(nonSnapshotFiles);
+
+        // specially handle the changelog directory
+        List<Path> nonChangelogFiles = 
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+        nonChangelogFiles.forEach(this::deleteFileOrDirQuietly);
+        deletedFilesNum += nonChangelogFiles.size();
+        deleteFiles.addAll(nonChangelogFiles);
 
         Map<String, Path> candidates = getCandidateDeletingFiles();
         Set<String> usedFiles = getUsedFiles();
@@ -136,10 +147,16 @@ public class OrphanFilesClean {
             deleteFileOrDirQuietly(path);
         }
         deletedFilesNum += deleted.size();
+        
deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));
 
         return deletedFilesNum;
     }
 
+    @VisibleForTesting
+    List<Path> getDeleteFiles() {
+        return deleteFiles;
+    }
+
     /** Get all the files used by snapshots and tags. */
     private Set<String> getUsedFiles()
             throws IOException, ExecutionException, InterruptedException {
@@ -147,6 +164,7 @@ public class OrphanFilesClean {
         Set<Snapshot> readSnapshots = new 
HashSet<>(snapshotManager.safelyGetAllSnapshots());
         List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
         readSnapshots.addAll(taggedSnapshots);
+        readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
 
         return FileUtils.COMMON_IO_FORK_JOIN_POOL
                 .submit(
@@ -154,8 +172,16 @@ public class OrphanFilesClean {
                                 readSnapshots
                                         .parallelStream()
                                         .flatMap(
-                                                snapshot ->
-                                                        
getUsedFilesForSnapshot(snapshot).stream())
+                                                snapshot -> {
+                                                    if (snapshot instanceof 
Changelog) {
+                                                        return 
getUsedFilesForChangelog(
+                                                                (Changelog) 
snapshot)
+                                                                .stream();
+                                                    } else {
+                                                        return 
getUsedFilesForSnapshot(snapshot)
+                                                                .stream();
+                                                    }
+                                                })
                                         .collect(Collectors.toSet()))
                 .get();
     }
@@ -184,6 +210,41 @@ public class OrphanFilesClean {
         }
     }
 
+    private List<String> getUsedFilesForChangelog(Changelog changelog) {
+        List<String> files = new ArrayList<>();
+        if (changelog.changelogManifestList() != null) {
+            files.add(changelog.changelogManifestList());
+        }
+
+        try {
+            // try to read manifests
+            List<ManifestFileMeta> manifestFileMetas =
+                    retryReadingFiles(
+                            () ->
+                                    manifestList.readWithIOException(
+                                            
changelog.changelogManifestList()));
+            if (manifestFileMetas == null) {
+                return Collections.emptyList();
+            }
+            List<String> manifestFileName =
+                    manifestFileMetas.stream()
+                            .map(ManifestFileMeta::fileName)
+                            .collect(Collectors.toList());
+            files.addAll(manifestFileName);
+
+            // try to read data files
+            List<String> dataFiles = retryReadingDataFiles(manifestFileName);
+            if (dataFiles == null) {
+                return Collections.emptyList();
+            }
+            files.addAll(dataFiles);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return files;
+    }
+
     /**
      * If getting null when reading some files, the snapshot/tag is being 
deleted, so just return an
      * empty result.
@@ -415,6 +476,7 @@ public class OrphanFilesClean {
                     .forEach(
                             p -> {
                                 deleteFileOrDirQuietly(p);
+                                deleteFiles.add(p);
                                 deletedFilesNum++;
                             });
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 3a255ad09..90801caf0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -138,7 +138,7 @@ public class RollbackHelper {
             return Collections.emptyList();
         }
 
-        // delete snapshot files first, cannot be read now
+        // delete changelog files first, cannot be read now
         // it is possible that some snapshots have been expired
         List<Changelog> toBeCleaned = new ArrayList<>();
         long to = Math.max(earliest, retainedSnapshot.id() + 1);
@@ -157,6 +157,21 @@ public class RollbackHelper {
         // delete directories
         snapshotDeletion.cleanDataDirectories();
 
+        // modify the latest hint
+        try {
+            if (toBeCleaned.size() > 0) {
+                if (to == earliest) {
+                    // all changelog has been cleaned, so we do not know the 
actual latest id
+                    // set to -1
+                    snapshotManager.commitLongLivedChangelogLatestHint(-1);
+                } else {
+                    snapshotManager.commitLongLivedChangelogLatestHint(to - 1);
+                }
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
         return toBeCleaned;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 834836dcb..7c5ccd28c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -318,6 +318,13 @@ public class SnapshotManager implements Serializable {
                 .iterator();
     }
 
+    public Iterator<Changelog> changelogs() throws IOException {
+        return listVersionedFiles(fileIO, changelogDirectory(), 
CHANGELOG_PREFIX)
+                .map(this::changelog)
+                .sorted(Comparator.comparingLong(Changelog::id))
+                .iterator();
+    }
+
     /**
      * If {@link FileNotFoundException} is thrown when reading the snapshot 
file, this snapshot may
      * be deleted by other processes, so just skip this snapshot.
@@ -336,13 +343,40 @@ public class SnapshotManager implements Serializable {
         return snapshots;
     }
 
+    public List<Changelog> safelyGetAllChangelogs() throws IOException {
+        List<Path> paths =
+                listVersionedFiles(fileIO, changelogDirectory(), 
CHANGELOG_PREFIX)
+                        .map(this::longLivedChangelogPath)
+                        .collect(Collectors.toList());
+
+        List<Changelog> changelogs = new ArrayList<>();
+        for (Path path : paths) {
+            try {
+                String json = fileIO.readFileUtf8(path);
+                changelogs.add(Changelog.fromJson(json));
+            } catch (FileNotFoundException ignored) {
+            }
+        }
+
+        return changelogs;
+    }
+
     /**
      * Try to get non snapshot files. If any error occurred, just ignore it 
and return an empty
      * result.
      */
     public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter) {
+        return listPathWithFilter(snapshotDirectory(), fileStatusFilter, 
nonSnapshotFileFilter());
+    }
+
+    public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> 
fileStatusFilter) {
+        return listPathWithFilter(changelogDirectory(), fileStatusFilter, 
nonChangelogFileFilter());
+    }
+
+    private List<Path> listPathWithFilter(
+            Path directory, Predicate<FileStatus> fileStatusFilter, 
Predicate<Path> fileFilter) {
         try {
-            FileStatus[] statuses = fileIO.listStatus(snapshotDirectory());
+            FileStatus[] statuses = fileIO.listStatus(directory);
             if (statuses == null) {
                 return Collections.emptyList();
             }
@@ -350,7 +384,7 @@ public class SnapshotManager implements Serializable {
             return Arrays.stream(statuses)
                     .filter(fileStatusFilter)
                     .map(FileStatus::getPath)
-                    .filter(nonSnapshotFileFilter())
+                    .filter(fileFilter)
                     .collect(Collectors.toList());
         } catch (IOException ignored) {
             return Collections.emptyList();
@@ -366,6 +400,15 @@ public class SnapshotManager implements Serializable {
         };
     }
 
+    private Predicate<Path> nonChangelogFileFilter() {
+        return path -> {
+            String name = path.getName();
+            return !name.startsWith(CHANGELOG_PREFIX)
+                    && !name.equals(EARLIEST)
+                    && !name.equals(LATEST);
+        };
+    }
+
     public Optional<Snapshot> latestSnapshotOfUser(String user) {
         Long latestId = latestSnapshotId();
         if (latestId == null) {
@@ -472,7 +515,7 @@ public class SnapshotManager implements Serializable {
         }
 
         Long snapshotId = readHint(LATEST, dir);
-        if (snapshotId != null) {
+        if (snapshotId != null && snapshotId > 0) {
             long nextSnapshot = snapshotId + 1;
             // it is the latest only there is no next one
             if (!fileIO.exists(file.apply(nextSnapshot))) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index c20cdf21d..c5bd22dc5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.Changelog;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -48,9 +50,11 @@ import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,6 +71,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
@@ -104,7 +109,7 @@ public class OrphanFilesCleanTest {
                             DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING(), DataTypes.STRING()
                         },
                         new String[] {"pk", "part1", "part2", "value"});
-        table = createFileStoreTable(rowType);
+        table = createFileStoreTable(rowType, new Options());
         String commitUser = UUID.randomUUID().toString();
         write = table.newWrite(commitUser);
         commit = table.newCommit(commitUser);
@@ -126,36 +131,9 @@ public class OrphanFilesCleanTest {
         int commitTimes = 30;
         List<List<TestPojo>> committedData = new ArrayList<>();
         Map<Long, List<TestPojo>> snapshotData = new HashMap<>();
-        SnapshotManager snapshotManager = table.snapshotManager();
-
-        // generate data of first snapshot
-        List<TestPojo> data = generateData();
-        commit(data);
-        committedData.add(data);
-        recordSnapshotData(data, snapshotData, snapshotManager);
-
-        // randomly generate data
-        for (int i = 1; i <= commitTimes; i++) {
-            List<TestPojo> previous =
-                    new 
ArrayList<>(snapshotData.get(snapshotManager.latestSnapshotId()));
-            // randomly update
-            if (RANDOM.nextBoolean()) {
-                List<TestPojo> toBeUpdated = randomlyPick(previous);
-                List<TestPojo> updateAfter = commitUpdate(toBeUpdated);
-                committedData.add(updateAfter);
-
-                previous.removeAll(toBeUpdated);
-                previous.addAll(updateAfter);
-                recordSnapshotData(previous, snapshotData, snapshotManager);
-            } else {
-                List<TestPojo> current = generateData();
-                commit(current);
-                committedData.add(current);
 
-                current.addAll(previous);
-                recordSnapshotData(current, snapshotData, snapshotManager);
-            }
-        }
+        SnapshotManager snapshotManager = table.snapshotManager();
+        writeData(snapshotManager, committedData, snapshotData, new 
HashMap<>(), commitTimes);
 
         // randomly create tags
         List<String> allTags = new ArrayList<>();
@@ -169,24 +147,8 @@ public class OrphanFilesCleanTest {
         }
 
         // generate non used files
-        int shouldBeDeleted = 0;
-        int fileNum = RANDOM.nextInt(10);
-        fileNum = fileNum == 0 ? 1 : fileNum;
-
-        // snapshot
-        addNonUsedFiles(
-                new Path(tablePath, "snapshot"), fileNum, 
Collections.singletonList("UNKNOWN"));
-        shouldBeDeleted += fileNum;
-
-        // data files
-        shouldBeDeleted += randomlyAddNonUsedDataFiles();
-
-        // manifests
-        addNonUsedFiles(
-                manifestDir,
-                fileNum,
-                Arrays.asList("manifest-list-", "manifest-", 
"index-manifest-", "UNKNOWN-"));
-        shouldBeDeleted += fileNum;
+        int shouldBeDeleted = generateUnUsedFile();
+        assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
 
         // randomly expire snapshots
         int expired = RANDOM.nextInt(snapshotCount / 2);
@@ -213,9 +175,9 @@ public class OrphanFilesCleanTest {
         // second check
         orphanFilesClean = new OrphanFilesClean(table);
         setOlderThan(orphanFilesClean);
-        int deletedActual = orphanFilesClean.clean();
+        orphanFilesClean.clean();
         try {
-            validate(deletedActual, shouldBeDeleted, snapshotData);
+            validate(orphanFilesClean.getDeleteFiles(), snapshotData, new 
HashMap<>());
         } catch (Throwable t) {
             String tableOptions = "Table options:\n" + table.options();
 
@@ -250,9 +212,20 @@ public class OrphanFilesCleanTest {
     }
 
     private void validate(
-            int actualDeleted, int shouldBeDeleted, Map<Long, List<TestPojo>> 
snapshotData)
+            List<Path> deleteFiles,
+            Map<Long, List<TestPojo>> snapshotData,
+            Map<Long, List<InternalRow>> changelogData)
             throws Exception {
-        assertThat(actualDeleted).isEqualTo(shouldBeDeleted);
+        assertThat(
+                        deleteFiles.stream()
+                                .map(p -> p.toUri().getPath())
+                                .sorted()
+                                .collect(Collectors.joining("\n")))
+                .isEqualTo(
+                        manuallyAddedFiles.stream()
+                                .map(p -> p.toUri().getPath())
+                                .sorted()
+                                .collect(Collectors.joining("\n")));
 
         Set<Snapshot> snapshots = new HashSet<>();
         table.snapshotManager().snapshots().forEachRemaining(snapshots::add);
@@ -269,6 +242,72 @@ public class OrphanFilesCleanTest {
                 throw new Exception("Failed to validate snapshot " + 
snapshot.id(), e);
             }
         }
+        // validate changelog
+        if (table.coreOptions().changelogProducer() == 
CoreOptions.ChangelogProducer.INPUT) {
+            List<Changelog> changelogs = new ArrayList<>();
+            
table.snapshotManager().changelogs().forEachRemaining(changelogs::add);
+            validateChangelog(
+                    changelogs.stream()
+                            .sorted(Comparator.comparingLong(Changelog::id))
+                            .collect(Collectors.toList()),
+                    changelogData);
+        }
+    }
+
+    private void validateChangelog(
+            List<Changelog> changelogs, Map<Long, List<InternalRow>> 
changelogData)
+            throws Exception {
+        Preconditions.checkArgument(!changelogs.isEmpty(), "The changelogs 
should not be empty!");
+        FileStoreTable scanTable =
+                table.copy(
+                        Collections.singletonMap(
+                                CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                                String.valueOf(changelogs.get(0).id())));
+        Long max =
+                changelogData.keySet().stream()
+                        .max(Comparator.comparingLong(Long::longValue))
+                        .get();
+        InnerStreamTableScan scan = scanTable.newStreamScan();
+        TreeMap<Long, List<InternalRow>> data = new TreeMap<>(changelogData);
+        // clear the data < the smallest changelog data.
+        data.headMap(changelogs.get(0).id()).clear();
+
+        // initial plan
+        scan.plan();
+        Long id = changelogs.get(0).id();
+        while (id <= max) {
+            List<Split> splits = scan.plan().splits();
+            if (!splits.isEmpty()) {
+                List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = 
new ArrayList<>();
+                for (Split split : splits) {
+                    readers.add(() -> scanTable.newRead().createReader(split));
+                }
+                RecordReader<InternalRow> recordReader = 
ConcatRecordReader.create(readers);
+                RecordReaderIterator<InternalRow> iterator =
+                        new RecordReaderIterator<>(recordReader);
+                List<String> result = new ArrayList<>();
+                while (iterator.hasNext()) {
+                    InternalRow rowData = iterator.next();
+                    result.add(DataFormatTestUtil.internalRowToString(rowData, 
rowType));
+                }
+                iterator.close();
+                id = scan.checkpoint();
+
+                List<InternalRow> batch = data.remove(id - 1);
+                
assertThat(result.stream().sorted().collect(Collectors.joining("\n")))
+                        .isEqualTo(
+                                batch.stream()
+                                        .map(
+                                                d ->
+                                                        
DataFormatTestUtil.internalRowToString(
+                                                                d, rowType))
+                                        .sorted()
+                                        .collect(Collectors.joining("\n")));
+            } else {
+                id = scan.checkpoint();
+            }
+        }
+        
Assertions.assertThat(data.values().stream().allMatch(List::isEmpty)).isTrue();
     }
 
     private void validateSnapshot(Snapshot snapshot, List<TestPojo> data) 
throws Exception {
@@ -289,6 +328,45 @@ public class OrphanFilesCleanTest {
         
assertThat(result).containsExactlyInAnyOrderElementsOf(TestPojo.formatData(data));
     }
 
+    @Test
+    public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception {
+        // recreate the table with another option
+        this.write.close();
+        this.commit.close();
+        int commitTimes = 30;
+        Options options = new Options();
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15);
+        options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
+        FileStoreTable table = createFileStoreTable(rowType, options);
+        String commitUser = UUID.randomUUID().toString();
+        this.table = table;
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+
+        List<List<TestPojo>> committedData = new ArrayList<>();
+        Map<Long, List<TestPojo>> snapshotData = new HashMap<>();
+        Map<Long, List<InternalRow>> changelogData = new HashMap<>();
+
+        SnapshotManager snapshotManager = table.snapshotManager();
+        writeData(snapshotManager, committedData, snapshotData, changelogData, 
commitTimes);
+
+        // generate non used files
+        int shouldBeDeleted = generateUnUsedFile();
+        assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);
+
+        // first check, nothing will be deleted because the default olderThan 
interval is 1 day
+        OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table);
+        assertThat(orphanFilesClean.clean()).isEqualTo(0);
+
+        // second check
+        orphanFilesClean = new OrphanFilesClean(table);
+        setOlderThan(orphanFilesClean);
+        orphanFilesClean.clean();
+        List<Path> cleanFiles = orphanFilesClean.getDeleteFiles();
+        validate(cleanFiles, snapshotData, changelogData);
+    }
+
     /** Manually make a FileNotFoundException to simulate snapshot expire 
while clean. */
     @Test
     public void testAbnormallyRemoving() throws Exception {
@@ -317,6 +395,75 @@ public class OrphanFilesCleanTest {
         assertThat(orphanFilesClean.clean()).isGreaterThan(0);
     }
 
+    private void writeData(
+            SnapshotManager snapshotManager,
+            List<List<TestPojo>> committedData,
+            Map<Long, List<TestPojo>> snapshotData,
+            Map<Long, List<InternalRow>> changelogData,
+            int commitTimes)
+            throws Exception {
+        // first snapshot
+        List<TestPojo> data = generateData();
+        commit(data);
+        committedData.add(data);
+        recordSnapshotData(data, snapshotData, snapshotManager);
+        recordChangelogData(new ArrayList<>(), data, changelogData, 
snapshotManager);
+
+        // randomly generate data
+        for (int i = 1; i <= commitTimes; i++) {
+            List<TestPojo> previous =
+                    new 
ArrayList<>(snapshotData.get(snapshotManager.latestSnapshotId()));
+            // randomly update
+            if (RANDOM.nextBoolean()) {
+                List<TestPojo> toBeUpdated = randomlyPick(previous);
+                List<TestPojo> updateAfter = commitUpdate(toBeUpdated);
+                committedData.add(updateAfter);
+
+                previous.removeAll(toBeUpdated);
+                previous.addAll(updateAfter);
+                recordSnapshotData(previous, snapshotData, snapshotManager);
+                recordChangelogData(toBeUpdated, updateAfter, changelogData, 
snapshotManager);
+            } else {
+                List<TestPojo> current = generateData();
+                commit(current);
+                committedData.add(current);
+
+                recordChangelogData(new ArrayList<>(), current, changelogData, 
snapshotManager);
+                current.addAll(previous);
+                recordSnapshotData(current, snapshotData, snapshotManager);
+            }
+        }
+    }
+
+    private int generateUnUsedFile() throws Exception {
+        int shouldBeDeleted = 0;
+        int fileNum = RANDOM.nextInt(10);
+        fileNum = fileNum == 0 ? 1 : fileNum;
+
+        // snapshot
+        addNonUsedFiles(
+                new Path(tablePath, "snapshot"), fileNum, 
Collections.singletonList("UNKNOWN"));
+
+        shouldBeDeleted += fileNum;
+
+        // changelog
+        addNonUsedFiles(
+                new Path(tablePath, "changelog"), fileNum, 
Collections.singletonList("UNKNOWN"));
+
+        shouldBeDeleted += fileNum;
+
+        // data files
+        shouldBeDeleted += randomlyAddNonUsedDataFiles();
+
+        // manifests
+        addNonUsedFiles(
+                manifestDir,
+                fileNum,
+                Arrays.asList("manifest-list-", "manifest-", 
"index-manifest-", "UNKNOWN-"));
+        shouldBeDeleted += fileNum;
+        return shouldBeDeleted;
+    }
+
     private void setOlderThan(OrphanFilesClean orphanFilesClean) {
         String timestamp =
                 DateTimeUtils.formatLocalDateTime(
@@ -367,6 +514,32 @@ public class OrphanFilesCleanTest {
         snapshotData.put(latest.id(), data);
     }
 
+    private void recordChangelogData(
+            List<TestPojo> updateBefore,
+            List<TestPojo> updateAfter,
+            Map<Long, List<InternalRow>> changelogData,
+            SnapshotManager snapshotManager) {
+        Snapshot latest = snapshotManager.latestSnapshot();
+        boolean isInsert = updateBefore.isEmpty();
+        if (table.coreOptions().changelogProducer() == 
CoreOptions.ChangelogProducer.INPUT) {
+            List<InternalRow> data = new ArrayList<>();
+            for (TestPojo testPojo : updateBefore) {
+                data.add(testPojo.toRow(RowKind.UPDATE_BEFORE));
+            }
+            for (TestPojo testPojo : updateAfter) {
+                data.add(testPojo.toRow(isInsert ? RowKind.INSERT : 
RowKind.UPDATE_AFTER));
+            }
+            if (latest.commitKind() != Snapshot.CommitKind.COMPACT) {
+                changelogData.put(latest.id(), data);
+            } else {
+                changelogData.put(latest.id() - 1, data);
+                changelogData.put(latest.id(), new ArrayList<>());
+            }
+        } else {
+            changelogData.put(latest.id(), new ArrayList<>());
+        }
+    }
+
     private int randomlyAddNonUsedDataFiles() throws IOException {
         int addedFiles = 0;
         List<Path> part1 = listSubDirs(tablePath, p -> 
p.getName().contains("="));
@@ -467,6 +640,22 @@ public class OrphanFilesCleanTest {
             return new TestPojo(pk, part1, part2, randomValue());
         }
 
+        @Override
+        public String toString() {
+            return "TestPojo{"
+                    + "pk="
+                    + pk
+                    + ", part1="
+                    + part1
+                    + ", part2='"
+                    + part2
+                    + '\''
+                    + ", value='"
+                    + value
+                    + '\''
+                    + '}';
+        }
+
         public static void reset() {
             increaseKey = 0;
         }
@@ -493,8 +682,7 @@ public class OrphanFilesCleanTest {
         return StringUtils.getRandomString(RANDOM, 5, 20, 'a', 'z');
     }
 
-    private FileStoreTable createFileStoreTable(RowType rowType) throws 
Exception {
-        Options conf = new Options();
+    private FileStoreTable createFileStoreTable(RowType rowType, Options conf) 
throws Exception {
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.BUCKET, RANDOM.nextInt(3) + 1);
         TableSchema tableSchema =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 6123a9e99..138a30d5b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -845,6 +845,7 @@ public abstract class FileStoreTableTestBase {
             Options options = new Options();
             options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
             options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
+            options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
             options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
             options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
             table.copy(options.toMap()).newCommit("").expireSnapshots();
@@ -881,6 +882,12 @@ public abstract class FileStoreTableTestBase {
 
     private FileStoreTable prepareRollbackTable(int commitTimes) throws 
Exception {
         FileStoreTable table = createFileStoreTable();
+        prepareRollbackTable(commitTimes, table);
+        return table;
+    }
+
+    protected FileStoreTable prepareRollbackTable(int commitTimes, 
FileStoreTable table)
+            throws Exception {
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index db6e12305..a1fc6f0b0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -66,6 +66,8 @@ import org.apache.paimon.utils.CompatibilityTestUtils;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,14 +75,19 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
+import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
 import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
 import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
 import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
@@ -1371,6 +1378,79 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
+    @Test
+    public void testRollbackToTagWithChangelogDecoupled() throws Exception {
+        int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
+        FileStoreTable table =
+                createFileStoreTable(
+                        options ->
+                                options.set(
+                                        CoreOptions.CHANGELOG_PRODUCER,
+                                        CoreOptions.ChangelogProducer.INPUT));
+        prepareRollbackTable(commitTimes, table);
+
+        int t1 = 1;
+        int t2 = commitTimes - 3;
+        int t3 = commitTimes - 1;
+        table.createTag("test1", t1);
+        table.createTag("test2", t2);
+        table.createTag("test3", t3);
+
+        // expire snapshots
+        Options options = new Options();
+        options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
+        // -------------changelog--------- |-------snapshot------|
+        // s(1)test1 -------- s(c - 3)test2 --- s(c - 1) test3
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
+        options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
+        options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
+        options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
+        options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
+        table.copy(options.toMap()).newCommit("").expireSnapshots();
+
+        table.rollbackTo("test3");
+        assertReadChangelog(t3, table);
+
+        table.rollbackTo("test2");
+        assertReadChangelog(t2, table);
+
+        table.rollbackTo("test1");
+
+        List<java.nio.file.Path> files =
+                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                        .collect(Collectors.toList());
+        assertThat(files.size()).isEqualTo(19);
+        // rollback snapshot case testRollbackToSnapshotCase0 plus 4:
+        // table-path/tag/tag-test1
+        // table-path/changelog
+        // table-path/changelog/LATEST
+        // table-path/changelog/EARLIEST
+    }
+
+    private void assertReadChangelog(int id, FileStoreTable table) throws 
Exception {
+        // read the changelog at #{id}
+        table =
+                table.copy(
+                        Collections.singletonMap(
+                                CoreOptions.SCAN_SNAPSHOT_ID.key(), 
String.valueOf(id)));
+        ReadBuilder readBuilder = table.newReadBuilder();
+        StreamTableScan scan = readBuilder.newStreamScan();
+
+        // skip the NextSnapshot
+        scan.plan();
+        List<String> results =
+                getResult(readBuilder.newRead(), scan.plan().splits(), 
BATCH_ROW_TO_STRING);
+        if (id == 1) {
+            assertThat(results).isEmpty();
+        } else {
+            assertThat(results)
+                    .containsExactlyInAnyOrder(
+                            String.format(
+                                    
"%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset",
+                                    id - 1, (id - 1) * 10, (id - 1) * 100));
+        }
+    }
+
     private void innerTestTableQuery(FileStoreTable table) throws Exception {
         IOManager ioManager = IOManager.create(tablePath.toString());
         StreamTableWrite write = 
table.newWrite(commitUser).withIOManager(ioManager);


Reply via email to