This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d35b1eec4 [core] Changelog decouple supports delta files (#3407)
d35b1eec4 is described below

commit d35b1eec418822ded38c0f4d3ba379d6d287c68a
Author: Aitozi <[email protected]>
AuthorDate: Tue Jun 11 10:17:59 2024 +0800

    [core] Changelog decouple supports delta files (#3407)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../apache/paimon/operation/FileDeletionBase.java  | 11 +++-
 .../apache/paimon/operation/OrphanFilesClean.java  | 75 +++++++++++++++++----
 .../apache/paimon/operation/SnapshotDeletion.java  | 28 +++++++-
 .../org/apache/paimon/operation/TagDeletion.java   |  2 +-
 .../paimon/table/AbstractFileStoreTable.java       |  1 +
 .../org/apache/paimon/table/RollbackHelper.java    | 25 ++++---
 .../paimon/table/source/AbstractDataTableScan.java |  3 -
 .../ContinuousFromSnapshotStartingScanner.java     |  9 +--
 .../ContinuousFromTimestampStartingScanner.java    |  7 +-
 .../apache/paimon/utils/NextSnapshotFetcher.java   |  7 +-
 .../test/java/org/apache/paimon/TestFileStore.java | 76 +++++++++++++++++++---
 .../paimon/operation/ExpireSnapshotsTest.java      |  2 +-
 .../paimon/operation/OrphanFilesCleanTest.java     |  9 ++-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 12 ++--
 ...ContinuousFromTimestampStartingScannerTest.java | 23 +++----
 .../paimon/flink/ContinuousFileStoreITCase.java    | 12 ++--
 17 files changed, 222 insertions(+), 83 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8d9eca295..ca8b90994 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -214,7 +214,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 manifestFileFactory().create(),
                 manifestListFactory().create(),
                 newIndexFileHandler(),
-                newStatsFileHandler());
+                newStatsFileHandler(),
+                options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index c0b5c289c..9d923f46d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -307,9 +307,14 @@ public abstract class FileDeletionBase<T extends Snapshot> 
{
     }
 
     protected void cleanUnusedManifests(
-            Snapshot snapshot, Set<String> skippingSet, boolean 
deleteChangelog) {
-        cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
-        cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
+            Snapshot snapshot,
+            Set<String> skippingSet,
+            boolean deleteDataManifestLists,
+            boolean deleteChangelog) {
+        if (deleteDataManifestLists) {
+            cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
+            cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
+        }
         if (deleteChangelog && snapshot.changelogManifestList() != null) {
             cleanUnusedManifestList(snapshot.changelogManifestList(), 
skippingSet);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index e78174db7..16d854ea4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -212,25 +212,72 @@ public class OrphanFilesClean {
 
     private List<String> getUsedFilesForChangelog(Changelog changelog) {
         List<String> files = new ArrayList<>();
-        if (changelog.changelogManifestList() != null) {
-            files.add(changelog.changelogManifestList());
-        }
-
+        List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
         try {
             // try to read manifests
-            List<ManifestFileMeta> manifestFileMetas =
-                    retryReadingFiles(
-                            () ->
-                                    manifestList.readWithIOException(
-                                            
changelog.changelogManifestList()));
-            if (manifestFileMetas == null) {
-                return Collections.emptyList();
+            // changelog manifest
+            List<ManifestFileMeta> changelogManifest = new ArrayList<>();
+            if (changelog.changelogManifestList() != null) {
+                files.add(changelog.changelogManifestList());
+                changelogManifest =
+                        retryReadingFiles(
+                                () ->
+                                        manifestList.readWithIOException(
+                                                
changelog.changelogManifestList()));
+                if (changelogManifest != null) {
+                    manifestFileMetas.addAll(changelogManifest);
+                }
             }
-            List<String> manifestFileName =
+
+            // base manifest
+            if (manifestList.exists(changelog.baseManifestList())) {
+                files.add(changelog.baseManifestList());
+                List<ManifestFileMeta> baseManifest =
+                        retryReadingFiles(
+                                () ->
+                                        manifestList.readWithIOException(
+                                                changelog.baseManifestList()));
+                if (baseManifest != null) {
+                    manifestFileMetas.addAll(baseManifest);
+                }
+            }
+
+            // delta manifest
+            List<ManifestFileMeta> deltaManifest = null;
+            if (manifestList.exists(changelog.deltaManifestList())) {
+                files.add(changelog.deltaManifestList());
+                deltaManifest =
+                        retryReadingFiles(
+                                () ->
+                                        manifestList.readWithIOException(
+                                                
changelog.deltaManifestList()));
+                if (deltaManifest != null) {
+                    manifestFileMetas.addAll(deltaManifest);
+                }
+            }
+
+            files.addAll(
                     manifestFileMetas.stream()
                             .map(ManifestFileMeta::fileName)
-                            .collect(Collectors.toList());
-            files.addAll(manifestFileName);
+                            .collect(Collectors.toList()));
+
+            // data file
+            List<String> manifestFileName = new ArrayList<>();
+            if (changelog.changelogManifestList() != null) {
+                manifestFileName.addAll(
+                        changelogManifest == null
+                                ? new ArrayList<>()
+                                : changelogManifest.stream()
+                                        .map(ManifestFileMeta::fileName)
+                                        .collect(Collectors.toList()));
+            } else {
+                manifestFileName.addAll(
+                        deltaManifest == null
+                                ? new ArrayList<>()
+                                : deltaManifest.stream()
+                                        .map(ManifestFileMeta::fileName)
+                                        .collect(Collectors.toList()));
+            }
 
             // try to read data files
             List<String> dataFiles = retryReadingDataFiles(manifestFileName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index f7fbdfac2..e83056638 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
@@ -39,24 +40,45 @@ import java.util.function.Predicate;
 /** Delete snapshot files. */
 public class SnapshotDeletion extends FileDeletionBase<Snapshot> {
 
+    private final boolean produceChangelog;
+
     public SnapshotDeletion(
             FileIO fileIO,
             FileStorePathFactory pathFactory,
             ManifestFile manifestFile,
             ManifestList manifestList,
             IndexFileHandler indexFileHandler,
-            StatsFileHandler statsFileHandler) {
+            StatsFileHandler statsFileHandler,
+            boolean produceChangelog) {
         super(fileIO, pathFactory, manifestFile, manifestList, 
indexFileHandler, statsFileHandler);
+        this.produceChangelog = produceChangelog;
     }
 
     @Override
     public void cleanUnusedDataFiles(Snapshot snapshot, 
Predicate<ManifestEntry> skipper) {
-        cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
+        if (changelogDecoupled && !produceChangelog) {
+            // Skip clean the 'APPEND' data files.If we do not have the file 
source information
+            // eg: the old version table file, we just skip clean this here, 
let it done by
+            // ExpireChangelogImpl
+            Predicate<ManifestEntry> enriched =
+                    manifestEntry ->
+                            skipper.test(manifestEntry)
+                                    || 
(manifestEntry.file().fileSource().orElse(FileSource.APPEND)
+                                            == FileSource.APPEND);
+            cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched);
+        } else {
+            cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
+        }
     }
 
     @Override
     public void cleanUnusedManifests(Snapshot snapshot, Set<String> 
skippingSet) {
-        cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled);
+        // delay clean the base and delta manifest lists when changelog 
decoupled enabled
+        cleanUnusedManifests(
+                snapshot,
+                skippingSet,
+                !changelogDecoupled || produceChangelog,
+                !changelogDecoupled);
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 01db29ec9..3b1174223 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -85,7 +85,7 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
     @Override
     public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String> 
skippingSet) {
         // doesn't clean changelog files because they are handled by 
SnapshotDeletion
-        cleanUnusedManifests(taggedSnapshot, skippingSet, false);
+        cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
     }
 
     public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) 
throws Exception {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6f723f834..bf8857e72 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -570,6 +570,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 tagManager(),
                 fileIO,
                 store().newSnapshotDeletion(),
+                store().newChangelogDeletion(),
                 store().newTagDeletion());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java 
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 90801caf0..bd608cdca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Changelog;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.utils.SnapshotManager;
@@ -51,6 +52,7 @@ public class RollbackHelper {
     private final TagManager tagManager;
     private final FileIO fileIO;
     private final SnapshotDeletion snapshotDeletion;
+    private final ChangelogDeletion changelogDeletion;
     private final TagDeletion tagDeletion;
 
     public RollbackHelper(
@@ -58,11 +60,13 @@ public class RollbackHelper {
             TagManager tagManager,
             FileIO fileIO,
             SnapshotDeletion snapshotDeletion,
+            ChangelogDeletion changelogDeletion,
             TagDeletion tagDeletion) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
         this.fileIO = fileIO;
         this.snapshotDeletion = snapshotDeletion;
+        this.changelogDeletion = changelogDeletion;
         this.tagDeletion = tagDeletion;
     }
 
@@ -72,6 +76,7 @@ public class RollbackHelper {
         List<Snapshot> cleanedSnapshots = 
cleanSnapshotsDataFiles(retainedSnapshot);
         List<Changelog> cleanedChangelogs = 
cleanLongLivedChangelogDataFiles(retainedSnapshot);
         List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
+        Set<Long> cleanedIds = new HashSet<>();
 
         // clean manifests
         // this can be used for snapshots and tags manifests cleaning both
@@ -79,17 +84,18 @@ public class RollbackHelper {
 
         for (Snapshot snapshot : cleanedSnapshots) {
             snapshotDeletion.cleanUnusedManifests(snapshot, 
manifestsSkippingSet);
+            cleanedIds.add(snapshot.id());
         }
 
         for (Changelog changelog : cleanedChangelogs) {
-            if (changelog.changelogManifestList() != null) {
-                snapshotDeletion.cleanUnusedManifestList(
-                        changelog.changelogManifestList(), new HashSet<>());
-            }
+            changelogDeletion.cleanUnusedManifests(changelog, 
manifestsSkippingSet);
+            cleanedIds.add(changelog.id());
         }
 
-        cleanedTags.removeAll(cleanedSnapshots);
         for (Snapshot snapshot : cleanedTags) {
+            if (cleanedIds.contains(snapshot.id())) {
+                continue;
+            }
             tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
         }
 
@@ -122,7 +128,9 @@ public class RollbackHelper {
         // when deleting non-existing data files
         for (Snapshot snapshot : toBeCleaned) {
             
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
-            
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+            if (snapshot.changelogManifestList() != null) {
+                
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+            }
         }
 
         // delete directories
@@ -149,9 +157,8 @@ public class RollbackHelper {
 
         // delete data files of changelog
         for (Changelog changelog : toBeCleaned) {
-            if (changelog.changelogManifestList() != null) {
-                
snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList());
-            }
+            // clean the deleted file
+            changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> 
false);
         }
 
         // delete directories
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 66b4dd8e4..8300db4c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -122,7 +122,6 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
                 return new ContinuousFromSnapshotStartingScanner(
                         snapshotManager,
                         consumer.get().nextSnapshot(),
-                        options.changelogProducer() != ChangelogProducer.NONE,
                         options.changelogLifecycleDecoupled());
             }
         }
@@ -152,7 +151,6 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
                         ? new ContinuousFromTimestampStartingScanner(
                                 snapshotManager,
                                 startupMillis,
-                                options.changelogProducer() != 
ChangelogProducer.NONE,
                                 options.changelogLifecycleDecoupled())
                         : new 
StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
             case FROM_FILE_CREATION_TIME:
@@ -164,7 +162,6 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
                             ? new ContinuousFromSnapshotStartingScanner(
                                     snapshotManager,
                                     options.scanSnapshotId(),
-                                    options.changelogProducer() != 
ChangelogProducer.NONE,
                                     options.changelogLifecycleDecoupled())
                             : new StaticFromSnapshotStartingScanner(
                                     snapshotManager, options.scanSnapshotId());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 38c01f35d..d8e614222 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -27,18 +27,13 @@ import org.apache.paimon.utils.SnapshotManager;
  */
 public class ContinuousFromSnapshotStartingScanner extends 
AbstractStartingScanner {
 
-    private final boolean changelogAsFollowup;
     private final boolean changelogDecoupled;
 
     public ContinuousFromSnapshotStartingScanner(
-            SnapshotManager snapshotManager,
-            long snapshotId,
-            boolean changelogAsFollowup,
-            boolean changelogDecoupled) {
+            SnapshotManager snapshotManager, long snapshotId, boolean 
changelogDecoupled) {
         super(snapshotManager);
         this.startingSnapshotId = snapshotId;
         this.changelogDecoupled = changelogDecoupled;
-        this.changelogAsFollowup = changelogAsFollowup;
     }
 
     @Override
@@ -54,7 +49,7 @@ public class ContinuousFromSnapshotStartingScanner extends 
AbstractStartingScann
 
     private Long getEarliestId() {
         Long earliestId;
-        if (changelogAsFollowup && changelogDecoupled) {
+        if (changelogDecoupled) {
             Long earliestChangelogId = 
snapshotManager.earliestLongLivedChangelogId();
             earliestId =
                     earliestChangelogId == null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 7e39e0859..941174835 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -37,13 +37,10 @@ public class ContinuousFromTimestampStartingScanner extends 
AbstractStartingScan
     private final boolean startFromChangelog;
 
     public ContinuousFromTimestampStartingScanner(
-            SnapshotManager snapshotManager,
-            long startupMillis,
-            boolean changelogAsFollowup,
-            boolean changelogDecoupled) {
+            SnapshotManager snapshotManager, long startupMillis, boolean 
changelogDecoupled) {
         super(snapshotManager);
         this.startupMillis = startupMillis;
-        this.startFromChangelog = changelogAsFollowup && changelogDecoupled;
+        this.startFromChangelog = changelogDecoupled;
         this.startingSnapshotId =
                 this.snapshotManager.earlierThanTimeMills(startupMillis, 
startFromChangelog);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
index 87f1fb849..204408392 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
@@ -32,8 +32,6 @@ public class NextSnapshotFetcher {
     public static final Logger LOG = 
LoggerFactory.getLogger(NextSnapshotFetcher.class);
     private final SnapshotManager snapshotManager;
     private final boolean changelogDecoupled;
-    // Only support changelog as follow-up now.
-    private final boolean changelogAsFollowup;
 
     public NextSnapshotFetcher(
             SnapshotManager snapshotManager,
@@ -41,7 +39,6 @@ public class NextSnapshotFetcher {
             boolean changelogAsFollowup) {
         this.snapshotManager = snapshotManager;
         this.changelogDecoupled = changelogDecoupled;
-        this.changelogAsFollowup = changelogAsFollowup;
     }
 
     @Nullable
@@ -59,9 +56,7 @@ public class NextSnapshotFetcher {
             return null;
         }
 
-        if (!changelogAsFollowup
-                || !changelogDecoupled
-                || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) {
+        if (!changelogDecoupled || 
!snapshotManager.longLivedChangelogExists(nextSnapshotId)) {
             throw new OutOfRangeException(
                     String.format(
                             "The snapshot with id %d has expired. You can: "
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 91b08635f..2b0c66380 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -607,6 +609,12 @@ public class TestFileStore extends KeyValueFileStore {
             FileStorePathFactory pathFactory,
             ManifestList manifestList) {
         Set<Path> result = new HashSet<>();
+        SchemaManager schemaManager = new SchemaManager(fileIO, 
snapshotManager.tablePath());
+        CoreOptions options = new 
CoreOptions(schemaManager.latest().get().options());
+        boolean produceChangelog =
+                options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE;
+        // The option from the table may not align with the expiration config
+        boolean changelogDecoupled = 
snapshotManager.earliestLongLivedChangelogId() != null;
 
         Path snapshotPath = snapshotManager.snapshotPath(snapshotId);
         Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath);
@@ -634,6 +642,27 @@ public class TestFileStore extends KeyValueFileStore {
                             entry.file().fileName()));
         }
 
+        // Add 'DELETE' 'APPEND' file in snapshot
+        // These 'delete' files can be merged by the plan#splits,
+        // so it's not shown in the entries above.
+        // In other words, these files are not used (by snapshot or changelog) 
now,
+        // but it can only be cleaned after this snapshot expired, so we 
should add it to the file
+        // use list.
+        if (changelogDecoupled && !produceChangelog) {
+            entries = 
scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files();
+            for (ManifestEntry entry : entries) {
+                // append delete file are delayed to delete
+                if (entry.kind() == FileKind.DELETE
+                        && entry.file().fileSource().orElse(FileSource.APPEND)
+                                == FileSource.APPEND) {
+                    result.add(
+                            new Path(
+                                    pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
+                                    entry.file().fileName()));
+                }
+            }
+        }
+
         return result;
     }
 
@@ -645,6 +674,10 @@ public class TestFileStore extends KeyValueFileStore {
             FileStorePathFactory pathFactory,
             ManifestList manifestList) {
         Set<Path> result = new HashSet<>();
+        SchemaManager schemaManager = new SchemaManager(fileIO, 
snapshotManager.tablePath());
+        CoreOptions options = new 
CoreOptions(schemaManager.latest().get().options());
+        boolean produceChangelog =
+                options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE;
 
         Path changelogPath = 
snapshotManager.longLivedChangelogPath(changelogId);
         Changelog changelog = Changelog.fromPath(fileIO, changelogPath);
@@ -653,23 +686,50 @@ public class TestFileStore extends KeyValueFileStore {
         result.add(changelogPath);
 
         // manifest lists
+        if (!produceChangelog) {
+            
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
+            
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
+        }
         if (changelog.changelogManifestList() != null) {
             
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
         }
 
         // manifests
-        List<ManifestFileMeta> manifests = new ArrayList<>();
-        manifests.addAll(changelog.changelogManifests(manifestList));
+        List<ManifestFileMeta> manifests =
+                new ArrayList<>(changelog.changelogManifests(manifestList));
+        if (!produceChangelog) {
+            manifests.addAll(changelog.dataManifests(manifestList));
+        }
 
         manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
 
         // data file
-        List<ManifestEntry> entries = 
scan.withManifestList(manifests).plan().files();
-        for (ManifestEntry entry : entries) {
-            result.add(
-                    new Path(
-                            pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
-                            entry.file().fileName()));
+        // not all manifests contains useful data file
+        // (1) produceChangelog = 'true': data file in changelog manifests
+        // (2) produceChangelog = 'false': 'APPEND' data file in delta 
manifests
+
+        // delta file
+        if (!produceChangelog) {
+            for (ManifestEntry entry :
+                    
scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) {
+                if (entry.file().fileSource().orElse(FileSource.APPEND) == 
FileSource.APPEND) {
+                    result.add(
+                            new Path(
+                                    pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
+                                    entry.file().fileName()));
+                }
+            }
+        } else {
+            // changelog
+            for (ManifestEntry entry :
+                    
scan.withManifestList(changelog.changelogManifests(manifestList))
+                            .plan()
+                            .files()) {
+                result.add(
+                        new Path(
+                                pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
+                                entry.file().fileName()));
+            }
         }
         return result;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 7c725164e..6af2d3051 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -85,7 +85,7 @@ public class ExpireSnapshotsTest {
                         
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
                         TestKeyValueGenerator.getPrimaryKeys(
                                 
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
-                        Collections.emptyMap(),
+                        store.options().toMap(),
                         null));
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 129f928c9..7ca2b58d7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -59,6 +59,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -328,8 +330,10 @@ public class OrphanFilesCleanTest {
         
assertThat(result).containsExactlyInAnyOrderElementsOf(TestPojo.formatData(data));
     }
 
-    @Test
-    public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception {
+    @ValueSource(strings = {"none", "input"})
+    @ParameterizedTest(name = "changelog-producer = {0}")
+    public void testCleanOrphanFilesWithChangelogDecoupled(String 
changelogProducer)
+            throws Exception {
         // recreate the table with another option
         this.write.close();
         this.commit.close();
@@ -338,6 +342,7 @@ public class OrphanFilesCleanTest {
         options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
         options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15);
         options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
+        options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), 
changelogProducer);
         FileStoreTable table = createFileStoreTable(rowType, options);
         String commitUser = UUID.randomUUID().toString();
         this.table = table;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 8026becfb..095511ae1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -65,6 +65,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CompatibilityTestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.nio.file.Files;
@@ -1491,15 +1493,15 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
-    @Test
-    public void testRollbackToTagWithChangelogDecoupled() throws Exception {
+    @ParameterizedTest(name = "changelog-producer = {0}")
+    @ValueSource(strings = {"none", "input"})
+    public void testRollbackToTagWithChangelogDecoupled(String 
changelogProducer) throws Exception {
         int commitTimes = ThreadLocalRandom.current().nextInt(100) + 6;
         FileStoreTable table =
                 createFileStoreTable(
                         options ->
-                                options.set(
-                                        CoreOptions.CHANGELOG_PRODUCER,
-                                        CoreOptions.ChangelogProducer.INPUT));
+                                options.setString(
+                                        CoreOptions.CHANGELOG_PRODUCER.key(), 
changelogProducer));
         prepareRollbackTable(commitTimes, table);
 
         int t1 = 1;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 51c2c10e7..46093f2cb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -29,6 +29,8 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.UUID;
 
@@ -65,8 +67,7 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         long timestamp = snapshotManager.snapshot(3).timeMillis();
 
         ContinuousFromTimestampStartingScanner scanner =
-                new ContinuousFromTimestampStartingScanner(
-                        snapshotManager, timestamp, false, false);
+                new ContinuousFromTimestampStartingScanner(snapshotManager, 
timestamp, false);
         StartingScanner.NextSnapshot result =
                 (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         assertThat(result.nextSnapshotId()).isEqualTo(3);
@@ -80,7 +81,7 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(
-                        snapshotManager, System.currentTimeMillis(), false, 
false);
+                        snapshotManager, System.currentTimeMillis(), false);
         
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
     }
 
@@ -100,8 +101,7 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         long timestamp = snapshotManager.snapshot(1).timeMillis();
 
         ContinuousFromTimestampStartingScanner scanner =
-                new ContinuousFromTimestampStartingScanner(
-                        snapshotManager, timestamp, false, false);
+                new ContinuousFromTimestampStartingScanner(snapshotManager, 
timestamp, false);
         StartingScanner.NextSnapshot result =
                 (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         // next snapshot
@@ -111,14 +111,15 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
         commit.close();
     }
 
-    @Test
-    public void testScanFromChangelog() throws Exception {
+    @ParameterizedTest(name = "changelog-producer = {0}")
+    @ValueSource(strings = {"none", "input"})
+    public void testScanFromChangelog(String changelogProducer) throws 
Exception {
         Options options = new Options();
         options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
         options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
         options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
         options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MIN, 10);
-        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
+        options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), 
changelogProducer);
         FileStoreTable table =
                 createFileStoreTable(
                         true,
@@ -158,20 +159,20 @@ public class ContinuousFromTimestampStartingScannerTest 
extends ScannerTestBase
 
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(
-                        snapshotManager, 
snapshotManager.snapshot(3).timeMillis(), true, true);
+                        snapshotManager, 
snapshotManager.snapshot(3).timeMillis(), true);
         StartingScanner.NextSnapshot result =
                 (StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
         assertThat(result.nextSnapshotId()).isEqualTo(3);
         scanner =
                 new ContinuousFromTimestampStartingScanner(
-                        snapshotManager, 
snapshotManager.snapshot(2).timeMillis(), true, true);
+                        snapshotManager, 
snapshotManager.snapshot(2).timeMillis(), true);
 
         assertThat(((StartingScanner.NextSnapshot) 
scanner.scan(snapshotReader)).nextSnapshotId())
                 .isEqualTo(2);
 
         scanner =
                 new ContinuousFromTimestampStartingScanner(
-                        snapshotManager, 
snapshotManager.changelog(1).timeMillis(), true, true);
+                        snapshotManager, 
snapshotManager.changelog(1).timeMillis(), true);
         assertThat(((StartingScanner.NextSnapshot) 
scanner.scan(snapshotReader)).nextSnapshotId())
                 .isEqualTo(1);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index c60b26f22..de3ab0c5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -33,6 +33,8 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -604,16 +606,18 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, 
"c"));
     }
 
-    @Test
-    public void testScanFromChangelog() throws Exception {
+    @ParameterizedTest(name = "changelog-producer = {0}")
+    @ValueSource(strings = {"none", "input"})
+    public void testScanFromChangelog(String changelogProducer) throws 
Exception {
         batchSql(
                 "CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)\n"
-                        + " WITH ('changelog-producer'='input', 'bucket' = 
'1', \n"
+                        + " WITH ('changelog-producer'='%s', 'bucket' = '1', 
\n"
                         + " 'snapshot.num-retained.max' = '2',\n"
                         + " 'snapshot.num-retained.min' = '1',\n"
                         + " 'changelog.num-retained.max' = '3',\n"
                         + " 'changelog.num-retained.min' = '1'\n"
-                        + ")");
+                        + ")",
+                changelogProducer);
 
         batchSql("INSERT INTO T3 VALUES ('1', '2', '3')");
         batchSql("INSERT INTO T3 VALUES ('4', '5', '6')");


Reply via email to