This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d35b1eec4 [core] Changelog decouple supports delta files (#3407)
d35b1eec4 is described below
commit d35b1eec418822ded38c0f4d3ba379d6d287c68a
Author: Aitozi <[email protected]>
AuthorDate: Tue Jun 11 10:17:59 2024 +0800
[core] Changelog decouple supports delta files (#3407)
---
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../apache/paimon/operation/FileDeletionBase.java | 11 +++-
.../apache/paimon/operation/OrphanFilesClean.java | 75 +++++++++++++++++----
.../apache/paimon/operation/SnapshotDeletion.java | 28 +++++++-
.../org/apache/paimon/operation/TagDeletion.java | 2 +-
.../paimon/table/AbstractFileStoreTable.java | 1 +
.../org/apache/paimon/table/RollbackHelper.java | 25 ++++---
.../paimon/table/source/AbstractDataTableScan.java | 3 -
.../ContinuousFromSnapshotStartingScanner.java | 9 +--
.../ContinuousFromTimestampStartingScanner.java | 7 +-
.../apache/paimon/utils/NextSnapshotFetcher.java | 7 +-
.../test/java/org/apache/paimon/TestFileStore.java | 76 +++++++++++++++++++---
.../paimon/operation/ExpireSnapshotsTest.java | 2 +-
.../paimon/operation/OrphanFilesCleanTest.java | 9 ++-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 12 ++--
...ContinuousFromTimestampStartingScannerTest.java | 23 +++----
.../paimon/flink/ContinuousFileStoreITCase.java | 12 ++--
17 files changed, 222 insertions(+), 83 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8d9eca295..ca8b90994 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -214,7 +214,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
manifestFileFactory().create(),
manifestListFactory().create(),
newIndexFileHandler(),
- newStatsFileHandler());
+ newStatsFileHandler(),
+ options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index c0b5c289c..9d923f46d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -307,9 +307,14 @@ public abstract class FileDeletionBase<T extends Snapshot>
{
}
protected void cleanUnusedManifests(
- Snapshot snapshot, Set<String> skippingSet, boolean
deleteChangelog) {
- cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
- cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
+ Snapshot snapshot,
+ Set<String> skippingSet,
+ boolean deleteDataManifestLists,
+ boolean deleteChangelog) {
+ if (deleteDataManifestLists) {
+ cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
+ cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
+ }
if (deleteChangelog && snapshot.changelogManifestList() != null) {
cleanUnusedManifestList(snapshot.changelogManifestList(),
skippingSet);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index e78174db7..16d854ea4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -212,25 +212,72 @@ public class OrphanFilesClean {
private List<String> getUsedFilesForChangelog(Changelog changelog) {
List<String> files = new ArrayList<>();
- if (changelog.changelogManifestList() != null) {
- files.add(changelog.changelogManifestList());
- }
-
+ List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
try {
// try to read manifests
- List<ManifestFileMeta> manifestFileMetas =
- retryReadingFiles(
- () ->
- manifestList.readWithIOException(
-
changelog.changelogManifestList()));
- if (manifestFileMetas == null) {
- return Collections.emptyList();
+ // changelog manifest
+ List<ManifestFileMeta> changelogManifest = new ArrayList<>();
+ if (changelog.changelogManifestList() != null) {
+ files.add(changelog.changelogManifestList());
+ changelogManifest =
+ retryReadingFiles(
+ () ->
+ manifestList.readWithIOException(
+
changelog.changelogManifestList()));
+ if (changelogManifest != null) {
+ manifestFileMetas.addAll(changelogManifest);
+ }
}
- List<String> manifestFileName =
+
+ // base manifest
+ if (manifestList.exists(changelog.baseManifestList())) {
+ files.add(changelog.baseManifestList());
+ List<ManifestFileMeta> baseManifest =
+ retryReadingFiles(
+ () ->
+ manifestList.readWithIOException(
+ changelog.baseManifestList()));
+ if (baseManifest != null) {
+ manifestFileMetas.addAll(baseManifest);
+ }
+ }
+
+ // delta manifest
+ List<ManifestFileMeta> deltaManifest = null;
+ if (manifestList.exists(changelog.deltaManifestList())) {
+ files.add(changelog.deltaManifestList());
+ deltaManifest =
+ retryReadingFiles(
+ () ->
+ manifestList.readWithIOException(
+
changelog.deltaManifestList()));
+ if (deltaManifest != null) {
+ manifestFileMetas.addAll(deltaManifest);
+ }
+ }
+
+ files.addAll(
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
- .collect(Collectors.toList());
- files.addAll(manifestFileName);
+ .collect(Collectors.toList()));
+
+ // data file
+ List<String> manifestFileName = new ArrayList<>();
+ if (changelog.changelogManifestList() != null) {
+ manifestFileName.addAll(
+ changelogManifest == null
+ ? new ArrayList<>()
+ : changelogManifest.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toList()));
+ } else {
+ manifestFileName.addAll(
+ deltaManifest == null
+ ? new ArrayList<>()
+ : deltaManifest.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toList()));
+ }
// try to read data files
List<String> dataFiles = retryReadingDataFiles(manifestFileName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index f7fbdfac2..e83056638 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
@@ -39,24 +40,45 @@ import java.util.function.Predicate;
/** Delete snapshot files. */
public class SnapshotDeletion extends FileDeletionBase<Snapshot> {
+ private final boolean produceChangelog;
+
public SnapshotDeletion(
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler,
- StatsFileHandler statsFileHandler) {
+ StatsFileHandler statsFileHandler,
+ boolean produceChangelog) {
super(fileIO, pathFactory, manifestFile, manifestList,
indexFileHandler, statsFileHandler);
+ this.produceChangelog = produceChangelog;
}
@Override
public void cleanUnusedDataFiles(Snapshot snapshot,
Predicate<ManifestEntry> skipper) {
- cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
+ if (changelogDecoupled && !produceChangelog) {
+ // Skip clean the 'APPEND' data files.If we do not have the file
source information
+ // eg: the old version table file, we just skip clean this here,
let it done by
+ // ExpireChangelogImpl
+ Predicate<ManifestEntry> enriched =
+ manifestEntry ->
+ skipper.test(manifestEntry)
+ ||
(manifestEntry.file().fileSource().orElse(FileSource.APPEND)
+ == FileSource.APPEND);
+ cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched);
+ } else {
+ cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
+ }
}
@Override
public void cleanUnusedManifests(Snapshot snapshot, Set<String>
skippingSet) {
- cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled);
+ // delay clean the base and delta manifest lists when changelog
decoupled enabled
+ cleanUnusedManifests(
+ snapshot,
+ skippingSet,
+ !changelogDecoupled || produceChangelog,
+ !changelogDecoupled);
}
@VisibleForTesting
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 01db29ec9..3b1174223 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -85,7 +85,7 @@ public class TagDeletion extends FileDeletionBase<Snapshot> {
@Override
public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String>
skippingSet) {
// doesn't clean changelog files because they are handled by
SnapshotDeletion
- cleanUnusedManifests(taggedSnapshot, skippingSet, false);
+ cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
}
public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot)
throws Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6f723f834..bf8857e72 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -570,6 +570,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
tagManager(),
fileIO,
store().newSnapshotDeletion(),
+ store().newChangelogDeletion(),
store().newTagDeletion());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 90801caf0..bd608cdca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.utils.SnapshotManager;
@@ -51,6 +52,7 @@ public class RollbackHelper {
private final TagManager tagManager;
private final FileIO fileIO;
private final SnapshotDeletion snapshotDeletion;
+ private final ChangelogDeletion changelogDeletion;
private final TagDeletion tagDeletion;
public RollbackHelper(
@@ -58,11 +60,13 @@ public class RollbackHelper {
TagManager tagManager,
FileIO fileIO,
SnapshotDeletion snapshotDeletion,
+ ChangelogDeletion changelogDeletion,
TagDeletion tagDeletion) {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.fileIO = fileIO;
this.snapshotDeletion = snapshotDeletion;
+ this.changelogDeletion = changelogDeletion;
this.tagDeletion = tagDeletion;
}
@@ -72,6 +76,7 @@ public class RollbackHelper {
List<Snapshot> cleanedSnapshots =
cleanSnapshotsDataFiles(retainedSnapshot);
List<Changelog> cleanedChangelogs =
cleanLongLivedChangelogDataFiles(retainedSnapshot);
List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
+ Set<Long> cleanedIds = new HashSet<>();
// clean manifests
// this can be used for snapshots and tags manifests cleaning both
@@ -79,17 +84,18 @@ public class RollbackHelper {
for (Snapshot snapshot : cleanedSnapshots) {
snapshotDeletion.cleanUnusedManifests(snapshot,
manifestsSkippingSet);
+ cleanedIds.add(snapshot.id());
}
for (Changelog changelog : cleanedChangelogs) {
- if (changelog.changelogManifestList() != null) {
- snapshotDeletion.cleanUnusedManifestList(
- changelog.changelogManifestList(), new HashSet<>());
- }
+ changelogDeletion.cleanUnusedManifests(changelog,
manifestsSkippingSet);
+ cleanedIds.add(changelog.id());
}
- cleanedTags.removeAll(cleanedSnapshots);
for (Snapshot snapshot : cleanedTags) {
+ if (cleanedIds.contains(snapshot.id())) {
+ continue;
+ }
tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
}
@@ -122,7 +128,9 @@ public class RollbackHelper {
// when deleting non-existing data files
for (Snapshot snapshot : toBeCleaned) {
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
-
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+ if (snapshot.changelogManifestList() != null) {
+
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
+ }
}
// delete directories
@@ -149,9 +157,8 @@ public class RollbackHelper {
// delete data files of changelog
for (Changelog changelog : toBeCleaned) {
- if (changelog.changelogManifestList() != null) {
-
snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList());
- }
+ // clean the deleted file
+ changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry ->
false);
}
// delete directories
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 66b4dd8e4..8300db4c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -122,7 +122,6 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
return new ContinuousFromSnapshotStartingScanner(
snapshotManager,
consumer.get().nextSnapshot(),
- options.changelogProducer() != ChangelogProducer.NONE,
options.changelogLifecycleDecoupled());
}
}
@@ -152,7 +151,6 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
? new ContinuousFromTimestampStartingScanner(
snapshotManager,
startupMillis,
- options.changelogProducer() !=
ChangelogProducer.NONE,
options.changelogLifecycleDecoupled())
: new
StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
case FROM_FILE_CREATION_TIME:
@@ -164,7 +162,6 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
? new ContinuousFromSnapshotStartingScanner(
snapshotManager,
options.scanSnapshotId(),
- options.changelogProducer() !=
ChangelogProducer.NONE,
options.changelogLifecycleDecoupled())
: new StaticFromSnapshotStartingScanner(
snapshotManager, options.scanSnapshotId());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 38c01f35d..d8e614222 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -27,18 +27,13 @@ import org.apache.paimon.utils.SnapshotManager;
*/
public class ContinuousFromSnapshotStartingScanner extends
AbstractStartingScanner {
- private final boolean changelogAsFollowup;
private final boolean changelogDecoupled;
public ContinuousFromSnapshotStartingScanner(
- SnapshotManager snapshotManager,
- long snapshotId,
- boolean changelogAsFollowup,
- boolean changelogDecoupled) {
+ SnapshotManager snapshotManager, long snapshotId, boolean
changelogDecoupled) {
super(snapshotManager);
this.startingSnapshotId = snapshotId;
this.changelogDecoupled = changelogDecoupled;
- this.changelogAsFollowup = changelogAsFollowup;
}
@Override
@@ -54,7 +49,7 @@ public class ContinuousFromSnapshotStartingScanner extends
AbstractStartingScann
private Long getEarliestId() {
Long earliestId;
- if (changelogAsFollowup && changelogDecoupled) {
+ if (changelogDecoupled) {
Long earliestChangelogId =
snapshotManager.earliestLongLivedChangelogId();
earliestId =
earliestChangelogId == null
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 7e39e0859..941174835 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -37,13 +37,10 @@ public class ContinuousFromTimestampStartingScanner extends
AbstractStartingScan
private final boolean startFromChangelog;
public ContinuousFromTimestampStartingScanner(
- SnapshotManager snapshotManager,
- long startupMillis,
- boolean changelogAsFollowup,
- boolean changelogDecoupled) {
+ SnapshotManager snapshotManager, long startupMillis, boolean
changelogDecoupled) {
super(snapshotManager);
this.startupMillis = startupMillis;
- this.startFromChangelog = changelogAsFollowup && changelogDecoupled;
+ this.startFromChangelog = changelogDecoupled;
this.startingSnapshotId =
this.snapshotManager.earlierThanTimeMills(startupMillis,
startFromChangelog);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
index 87f1fb849..204408392 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java
@@ -32,8 +32,6 @@ public class NextSnapshotFetcher {
public static final Logger LOG =
LoggerFactory.getLogger(NextSnapshotFetcher.class);
private final SnapshotManager snapshotManager;
private final boolean changelogDecoupled;
- // Only support changelog as follow-up now.
- private final boolean changelogAsFollowup;
public NextSnapshotFetcher(
SnapshotManager snapshotManager,
@@ -41,7 +39,6 @@ public class NextSnapshotFetcher {
boolean changelogAsFollowup) {
this.snapshotManager = snapshotManager;
this.changelogDecoupled = changelogDecoupled;
- this.changelogAsFollowup = changelogAsFollowup;
}
@Nullable
@@ -59,9 +56,7 @@ public class NextSnapshotFetcher {
return null;
}
- if (!changelogAsFollowup
- || !changelogDecoupled
- || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) {
+ if (!changelogDecoupled ||
!snapshotManager.longLivedChangelogExists(nextSnapshotId)) {
throw new OutOfRangeException(
String.format(
"The snapshot with id %d has expired. You can: "
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 91b08635f..2b0c66380 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -607,6 +609,12 @@ public class TestFileStore extends KeyValueFileStore {
FileStorePathFactory pathFactory,
ManifestList manifestList) {
Set<Path> result = new HashSet<>();
+ SchemaManager schemaManager = new SchemaManager(fileIO,
snapshotManager.tablePath());
+ CoreOptions options = new
CoreOptions(schemaManager.latest().get().options());
+ boolean produceChangelog =
+ options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE;
+ // The option from the table may not align with the expiration config
+ boolean changelogDecoupled =
snapshotManager.earliestLongLivedChangelogId() != null;
Path snapshotPath = snapshotManager.snapshotPath(snapshotId);
Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath);
@@ -634,6 +642,27 @@ public class TestFileStore extends KeyValueFileStore {
entry.file().fileName()));
}
+ // Add 'DELETE' 'APPEND' file in snapshot
+ // These 'delete' files can be merged by the plan#splits,
+ // so it's not shown in the entries above.
+ // In other words, these files are not used (by snapshot or changelog)
now,
+ // but it can only be cleaned after this snapshot expired, so we
should add it to the file
+ // use list.
+ if (changelogDecoupled && !produceChangelog) {
+ entries =
scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files();
+ for (ManifestEntry entry : entries) {
+ // append delete file are delayed to delete
+ if (entry.kind() == FileKind.DELETE
+ && entry.file().fileSource().orElse(FileSource.APPEND)
+ == FileSource.APPEND) {
+ result.add(
+ new Path(
+ pathFactory.bucketPath(entry.partition(),
entry.bucket()),
+ entry.file().fileName()));
+ }
+ }
+ }
+
return result;
}
@@ -645,6 +674,10 @@ public class TestFileStore extends KeyValueFileStore {
FileStorePathFactory pathFactory,
ManifestList manifestList) {
Set<Path> result = new HashSet<>();
+ SchemaManager schemaManager = new SchemaManager(fileIO,
snapshotManager.tablePath());
+ CoreOptions options = new
CoreOptions(schemaManager.latest().get().options());
+ boolean produceChangelog =
+ options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE;
Path changelogPath =
snapshotManager.longLivedChangelogPath(changelogId);
Changelog changelog = Changelog.fromPath(fileIO, changelogPath);
@@ -653,23 +686,50 @@ public class TestFileStore extends KeyValueFileStore {
result.add(changelogPath);
// manifest lists
+ if (!produceChangelog) {
+
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
+
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
+ }
if (changelog.changelogManifestList() != null) {
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
}
// manifests
- List<ManifestFileMeta> manifests = new ArrayList<>();
- manifests.addAll(changelog.changelogManifests(manifestList));
+ List<ManifestFileMeta> manifests =
+ new ArrayList<>(changelog.changelogManifests(manifestList));
+ if (!produceChangelog) {
+ manifests.addAll(changelog.dataManifests(manifestList));
+ }
manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
// data file
- List<ManifestEntry> entries =
scan.withManifestList(manifests).plan().files();
- for (ManifestEntry entry : entries) {
- result.add(
- new Path(
- pathFactory.bucketPath(entry.partition(),
entry.bucket()),
- entry.file().fileName()));
+ // not all manifests contains useful data file
+ // (1) produceChangelog = 'true': data file in changelog manifests
+ // (2) produceChangelog = 'false': 'APPEND' data file in delta
manifests
+
+ // delta file
+ if (!produceChangelog) {
+ for (ManifestEntry entry :
+
scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) {
+ if (entry.file().fileSource().orElse(FileSource.APPEND) ==
FileSource.APPEND) {
+ result.add(
+ new Path(
+ pathFactory.bucketPath(entry.partition(),
entry.bucket()),
+ entry.file().fileName()));
+ }
+ }
+ } else {
+ // changelog
+ for (ManifestEntry entry :
+
scan.withManifestList(changelog.changelogManifests(manifestList))
+ .plan()
+ .files()) {
+ result.add(
+ new Path(
+ pathFactory.bucketPath(entry.partition(),
entry.bucket()),
+ entry.file().fileName()));
+ }
}
return result;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 7c725164e..6af2d3051 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -85,7 +85,7 @@ public class ExpireSnapshotsTest {
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
TestKeyValueGenerator.getPrimaryKeys(
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
- Collections.emptyMap(),
+ store.options().toMap(),
null));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index 129f928c9..7ca2b58d7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -59,6 +59,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -328,8 +330,10 @@ public class OrphanFilesCleanTest {
assertThat(result).containsExactlyInAnyOrderElementsOf(TestPojo.formatData(data));
}
- @Test
- public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception {
+ @ValueSource(strings = {"none", "input"})
+ @ParameterizedTest(name = "changelog-producer = {0}")
+ public void testCleanOrphanFilesWithChangelogDecoupled(String
changelogProducer)
+ throws Exception {
// recreate the table with another option
this.write.close();
this.commit.close();
@@ -338,6 +342,7 @@ public class OrphanFilesCleanTest {
options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15);
options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
+ options.setString(CoreOptions.CHANGELOG_PRODUCER.key(),
changelogProducer);
FileStoreTable table = createFileStoreTable(rowType, options);
String commitUser = UUID.randomUUID().toString();
this.table = table;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 8026becfb..095511ae1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -65,6 +65,8 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CompatibilityTestUtils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.nio.file.Files;
@@ -1491,15 +1493,15 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
}
- @Test
- public void testRollbackToTagWithChangelogDecoupled() throws Exception {
+ @ParameterizedTest(name = "changelog-producer = {0}")
+ @ValueSource(strings = {"none", "input"})
+ public void testRollbackToTagWithChangelogDecoupled(String
changelogProducer) throws Exception {
int commitTimes = ThreadLocalRandom.current().nextInt(100) + 6;
FileStoreTable table =
createFileStoreTable(
options ->
- options.set(
- CoreOptions.CHANGELOG_PRODUCER,
- CoreOptions.ChangelogProducer.INPUT));
+ options.setString(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
changelogProducer));
prepareRollbackTable(commitTimes, table);
int t1 = 1;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 51c2c10e7..46093f2cb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -29,6 +29,8 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.UUID;
@@ -65,8 +67,7 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
long timestamp = snapshotManager.snapshot(3).timeMillis();
ContinuousFromTimestampStartingScanner scanner =
- new ContinuousFromTimestampStartingScanner(
- snapshotManager, timestamp, false, false);
+ new ContinuousFromTimestampStartingScanner(snapshotManager,
timestamp, false);
StartingScanner.NextSnapshot result =
(StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
assertThat(result.nextSnapshotId()).isEqualTo(3);
@@ -80,7 +81,7 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
SnapshotManager snapshotManager = table.snapshotManager();
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(
- snapshotManager, System.currentTimeMillis(), false,
false);
+ snapshotManager, System.currentTimeMillis(), false);
assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class);
}
@@ -100,8 +101,7 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
long timestamp = snapshotManager.snapshot(1).timeMillis();
ContinuousFromTimestampStartingScanner scanner =
- new ContinuousFromTimestampStartingScanner(
- snapshotManager, timestamp, false, false);
+ new ContinuousFromTimestampStartingScanner(snapshotManager,
timestamp, false);
StartingScanner.NextSnapshot result =
(StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
// next snapshot
@@ -111,14 +111,15 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
commit.close();
}
- @Test
- public void testScanFromChangelog() throws Exception {
+ @ParameterizedTest(name = "changelog-producer = {0}")
+ @ValueSource(strings = {"none", "input"})
+ public void testScanFromChangelog(String changelogProducer) throws
Exception {
Options options = new Options();
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MIN, 10);
- options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
+ options.setString(CoreOptions.CHANGELOG_PRODUCER.key(),
changelogProducer);
FileStoreTable table =
createFileStoreTable(
true,
@@ -158,20 +159,20 @@ public class ContinuousFromTimestampStartingScannerTest
extends ScannerTestBase
ContinuousFromTimestampStartingScanner scanner =
new ContinuousFromTimestampStartingScanner(
- snapshotManager,
snapshotManager.snapshot(3).timeMillis(), true, true);
+ snapshotManager,
snapshotManager.snapshot(3).timeMillis(), true);
StartingScanner.NextSnapshot result =
(StartingScanner.NextSnapshot) scanner.scan(snapshotReader);
assertThat(result.nextSnapshotId()).isEqualTo(3);
scanner =
new ContinuousFromTimestampStartingScanner(
- snapshotManager,
snapshotManager.snapshot(2).timeMillis(), true, true);
+ snapshotManager,
snapshotManager.snapshot(2).timeMillis(), true);
assertThat(((StartingScanner.NextSnapshot)
scanner.scan(snapshotReader)).nextSnapshotId())
.isEqualTo(2);
scanner =
new ContinuousFromTimestampStartingScanner(
- snapshotManager,
snapshotManager.changelog(1).timeMillis(), true, true);
+ snapshotManager,
snapshotManager.changelog(1).timeMillis(), true);
assertThat(((StartingScanner.NextSnapshot)
scanner.scan(snapshotReader)).nextSnapshotId())
.isEqualTo(1);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index c60b26f22..de3ab0c5f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -33,6 +33,8 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -604,16 +606,18 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3,
"c"));
}
- @Test
- public void testScanFromChangelog() throws Exception {
+ @ParameterizedTest(name = "changelog-producer = {0}")
+ @ValueSource(strings = {"none", "input"})
+ public void testScanFromChangelog(String changelogProducer) throws
Exception {
batchSql(
"CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)\n"
- + " WITH ('changelog-producer'='input', 'bucket' =
'1', \n"
+ + " WITH ('changelog-producer'='%s', 'bucket' = '1',
\n"
+ " 'snapshot.num-retained.max' = '2',\n"
+ " 'snapshot.num-retained.min' = '1',\n"
+ " 'changelog.num-retained.max' = '3',\n"
+ " 'changelog.num-retained.min' = '1'\n"
- + ")");
+ + ")",
+ changelogProducer);
batchSql("INSERT INTO T3 VALUES ('1', '2', '3')");
batchSql("INSERT INTO T3 VALUES ('4', '5', '6')");