This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dba05e0953 [core] Avoid getting file size for manifest list and
writing file (#5224)
dba05e0953 is described below
commit dba05e09539cddb7247f5e27d0a8292acfade77a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 7 13:07:13 2025 +0800
[core] Avoid getting file size for manifest list and writing file (#5224)
---
.../src/main/java/org/apache/paimon/Changelog.java | 10 ++++
.../src/main/java/org/apache/paimon/Snapshot.java | 55 +++++++++++++++++++
.../iceberg/manifest/IcebergManifestFile.java | 4 +-
.../apache/paimon/io/KeyValueDataFileWriter.java | 2 +-
.../org/apache/paimon/io/RowDataFileWriter.java | 2 +-
.../org/apache/paimon/io/SingleFileWriter.java | 4 +-
.../org/apache/paimon/manifest/ManifestFile.java | 2 +-
.../org/apache/paimon/manifest/ManifestList.java | 11 ++--
.../paimon/operation/FileStoreCommitImpl.java | 62 ++++++++++++----------
.../src/main/java/org/apache/paimon/tag/Tag.java | 13 +++++
.../java/org/apache/paimon/utils/ObjectsFile.java | 13 +++--
.../test/java/org/apache/paimon/SnapshotTest.java | 3 ++
.../apache/paimon/manifest/ManifestListTest.java | 4 +-
.../apache/paimon/operation/FileDeletionTest.java | 2 +-
.../org/apache/paimon/tag/TagAutoManagerTest.java | 6 +++
.../test/java/org/apache/paimon/tag/TagTest.java | 3 ++
.../apache/paimon/utils/SnapshotManagerTest.java | 15 ++++++
17 files changed, 165 insertions(+), 46 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
index 79c65ba570..672c57a21a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
@@ -47,8 +47,11 @@ public class Changelog extends Snapshot {
snapshot.id(),
snapshot.schemaId(),
snapshot.baseManifestList(),
+ snapshot.baseManifestListSize(),
snapshot.deltaManifestList(),
+ snapshot.deltaManifestListSize(),
snapshot.changelogManifestList(),
+ snapshot.changelogManifestListSize(),
snapshot.indexManifest(),
snapshot.commitUser(),
snapshot.commitIdentifier(),
@@ -68,8 +71,12 @@ public class Changelog extends Snapshot {
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
+ @JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE) @Nullable Long
baseManifestListSize,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
+ @JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE) @Nullable Long
deltaManifestListSize,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String
changelogManifestList,
+ @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE) @Nullable
+ Long changelogManifestListSize,
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@@ -86,8 +93,11 @@ public class Changelog extends Snapshot {
id,
schemaId,
baseManifestList,
+ baseManifestListSize,
deltaManifestList,
+ deltaManifestListSize,
changelogManifestList,
+ changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 3b53315d18..e4daadb925 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -75,8 +75,11 @@ public class Snapshot implements Serializable {
protected static final String FIELD_ID = "id";
protected static final String FIELD_SCHEMA_ID = "schemaId";
protected static final String FIELD_BASE_MANIFEST_LIST =
"baseManifestList";
+ protected static final String FIELD_BASE_MANIFEST_LIST_SIZE =
"baseManifestListSize";
protected static final String FIELD_DELTA_MANIFEST_LIST =
"deltaManifestList";
+ protected static final String FIELD_DELTA_MANIFEST_LIST_SIZE =
"deltaManifestListSize";
protected static final String FIELD_CHANGELOG_MANIFEST_LIST =
"changelogManifestList";
+ protected static final String FIELD_CHANGELOG_MANIFEST_LIST_SIZE =
"changelogManifestListSize";
protected static final String FIELD_INDEX_MANIFEST = "indexManifest";
protected static final String FIELD_COMMIT_USER = "commitUser";
protected static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
@@ -105,17 +108,32 @@ public class Snapshot implements Serializable {
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
protected final String baseManifestList;
+ @JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ protected final Long baseManifestListSize;
+
// a manifest list recording all new changes occurred in this snapshot
// for faster expire and streaming reads
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
protected final String deltaManifestList;
+ @JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ protected final Long deltaManifestListSize;
+
// a manifest list recording all changelog produced in this snapshot
// null if no changelog is produced, or for paimon <= 0.2
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
protected final String changelogManifestList;
+ @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ protected final Long changelogManifestListSize;
+
// a manifest recording all index files of this table
// null if no index file
@JsonProperty(FIELD_INDEX_MANIFEST)
@@ -185,8 +203,11 @@ public class Snapshot implements Serializable {
long id,
long schemaId,
String baseManifestList,
+ @Nullable Long baseManifestListSize,
String deltaManifestList,
+ @Nullable Long deltaManifestListSize,
@Nullable String changelogManifestList,
+ @Nullable Long changelogManifestListSize,
@Nullable String indexManifest,
String commitUser,
long commitIdentifier,
@@ -203,8 +224,11 @@ public class Snapshot implements Serializable {
id,
schemaId,
baseManifestList,
+ baseManifestListSize,
deltaManifestList,
+ deltaManifestListSize,
changelogManifestList,
+ changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
@@ -224,8 +248,12 @@ public class Snapshot implements Serializable {
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
+ @JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE) @Nullable Long
baseManifestListSize,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
+ @JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE) @Nullable Long
deltaManifestListSize,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String
changelogManifestList,
+ @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE) @Nullable
+ Long changelogManifestListSize,
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@@ -241,8 +269,11 @@ public class Snapshot implements Serializable {
this.id = id;
this.schemaId = schemaId;
this.baseManifestList = baseManifestList;
+ this.baseManifestListSize = baseManifestListSize;
this.deltaManifestList = deltaManifestList;
+ this.deltaManifestListSize = deltaManifestListSize;
this.changelogManifestList = changelogManifestList;
+ this.changelogManifestListSize = changelogManifestListSize;
this.indexManifest = indexManifest;
this.commitUser = commitUser;
this.commitIdentifier = commitIdentifier;
@@ -277,17 +308,35 @@ public class Snapshot implements Serializable {
return baseManifestList;
}
+ @JsonGetter(FIELD_BASE_MANIFEST_LIST_SIZE)
+ @Nullable
+ public Long baseManifestListSize() {
+ return baseManifestListSize;
+ }
+
@JsonGetter(FIELD_DELTA_MANIFEST_LIST)
public String deltaManifestList() {
return deltaManifestList;
}
+ @JsonGetter(FIELD_DELTA_MANIFEST_LIST_SIZE)
+ @Nullable
+ public Long deltaManifestListSize() {
+ return deltaManifestListSize;
+ }
+
@JsonGetter(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
public String changelogManifestList() {
return changelogManifestList;
}
+ @JsonGetter(FIELD_CHANGELOG_MANIFEST_LIST_SIZE)
+ @Nullable
+ public Long changelogManifestListSize() {
+ return changelogManifestListSize;
+ }
+
@JsonGetter(FIELD_INDEX_MANIFEST)
@Nullable
public String indexManifest() {
@@ -361,8 +410,11 @@ public class Snapshot implements Serializable {
id,
schemaId,
baseManifestList,
+ baseManifestListSize,
deltaManifestList,
+ deltaManifestListSize,
changelogManifestList,
+ changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
@@ -385,8 +437,11 @@ public class Snapshot implements Serializable {
&& id == that.id
&& schemaId == that.schemaId
&& Objects.equals(baseManifestList, that.baseManifestList)
+ && Objects.equals(baseManifestListSize,
that.baseManifestListSize)
&& Objects.equals(deltaManifestList, that.deltaManifestList)
+ && Objects.equals(deltaManifestListSize,
that.deltaManifestListSize)
&& Objects.equals(changelogManifestList,
that.changelogManifestList)
+ && Objects.equals(changelogManifestListSize,
that.changelogManifestListSize)
&& Objects.equals(indexManifest, that.indexManifest)
&& Objects.equals(commitUser, that.commitUser)
&& commitIdentifier == that.commitIdentifier
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 4553a1c850..6e69602eb8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -236,7 +236,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
}
@Override
- public IcebergManifestFileMeta result() throws IOException {
+ public IcebergManifestFileMeta result() {
SimpleColStats[] stats = partitionStatsCollector.extract();
List<IcebergPartitionSummary> partitionSummaries = new
ArrayList<>();
for (int i = 0; i < stats.length; i++) {
@@ -251,7 +251,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
}
return new IcebergManifestFileMeta(
path.toString(),
- fileIO.getFileSize(path),
+ outputBytes,
IcebergPartitionSpec.SPEC_ID,
Content.DATA,
sequenceNumber,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index e655f0ed3a..c15d54310d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -169,7 +169,7 @@ public abstract class KeyValueDataFileWriter
return null;
}
- long fileSize = fileIO.getFileSize(path);
+ long fileSize = outputBytes;
Pair<SimpleColStats[], SimpleColStats[]> keyValueStats =
fetchKeyValueStats(fieldStats(fileSize));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index a21041a3ab..397157be0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -109,7 +109,7 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
@Override
public DataFileMeta result() throws IOException {
- long fileSize = fileIO.getFileSize(path);
+ long fileSize = outputBytes;
Pair<List<String>, SimpleStats> statsPair =
statsArraySerializer.toBinary(fieldStats(fileSize));
DataFileIndexWriter.FileIndexResult indexResult =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index f303e85978..ec80244113 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -52,6 +52,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
private FormatWriter writer;
private PositionOutputStream out;
+ protected long outputBytes;
private long recordCount;
protected boolean closed;
@@ -170,7 +171,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Closing file " + path);
+ LOG.debug("Closing file {}", path);
}
try {
@@ -180,6 +181,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
}
if (out != null) {
out.flush();
+ outputBytes = out.getPos();
out.close();
out = null;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 2bcca9c906..3bb1ddf480 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -165,7 +165,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
public ManifestFileMeta result() throws IOException {
return new ManifestFileMeta(
path.getName(),
- fileIO.getFileSize(path),
+ outputBytes,
numAddedFiles,
numDeletedFiles,
partitionStatsSerializer.toBinaryAllMode(partitionStatsCollector.extract()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 7768ddee51..ab2751ab70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.VersionedObjectSerializer;
@@ -83,7 +84,7 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
*/
public List<ManifestFileMeta> readDataManifests(Snapshot snapshot) {
List<ManifestFileMeta> result = new ArrayList<>();
- result.addAll(read(snapshot.baseManifestList()));
+ result.addAll(read(snapshot.baseManifestList(),
snapshot.baseManifestListSize()));
result.addAll(readDeltaManifests(snapshot));
return result;
}
@@ -94,7 +95,7 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> readDeltaManifests(Snapshot snapshot) {
- return read(snapshot.deltaManifestList());
+ return read(snapshot.deltaManifestList(),
snapshot.deltaManifestListSize());
}
/**
@@ -105,7 +106,7 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
public List<ManifestFileMeta> readChangelogManifests(Snapshot snapshot) {
return snapshot.changelogManifestList() == null
? Collections.emptyList()
- : read(snapshot.changelogManifestList());
+ : read(snapshot.changelogManifestList(),
snapshot.changelogManifestListSize());
}
/**
@@ -113,8 +114,8 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
*
* <p>NOTE: This method is atomic.
*/
- public String write(List<ManifestFileMeta> metas) {
- return super.writeWithoutRolling(metas);
+ public Pair<String, Long> write(List<ManifestFileMeta> metas) {
+ return super.writeWithoutRolling(metas.iterator());
}
/** Creator of {@link ManifestList}. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index fc890b5c9a..20cacb9aea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -877,10 +877,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
Snapshot newSnapshot;
- String baseManifestList = null;
- String deltaManifestList = null;
- List<PartitionEntry> deltaStatistics = null;
- String changelogManifestList = null;
+ Pair<String, Long> baseManifestList = null;
+ Pair<String, Long> deltaManifestList = null;
+ List<PartitionEntry> deltaStatistics;
+ Pair<String, Long> changelogManifestList = null;
String oldIndexManifest = null;
String indexManifest = null;
List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
@@ -974,9 +974,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
new Snapshot(
newSnapshotId,
latestSchemaId,
- baseManifestList,
- deltaManifestList,
- changelogManifestList,
+ baseManifestList.getLeft(),
+ baseManifestList.getRight(),
+ deltaManifestList.getKey(),
+ deltaManifestList.getRight(),
+ changelogManifestList == null ? null :
changelogManifestList.getKey(),
+ changelogManifestList == null ? null :
changelogManifestList.getRight(),
indexManifest,
commitUser,
identifier,
@@ -1110,16 +1113,19 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
- String baseManifestList = manifestList.write(mergeAfterManifests);
- String deltaManifestList = manifestList.write(emptyList());
+ Pair<String, Long> baseManifestList =
manifestList.write(mergeAfterManifests);
+ Pair<String, Long> deltaManifestList = manifestList.write(emptyList());
// prepare snapshot file
Snapshot newSnapshot =
new Snapshot(
latestSnapshot.id() + 1,
latestSnapshot.schemaId(),
- baseManifestList,
- deltaManifestList,
+ baseManifestList.getLeft(),
+ baseManifestList.getRight(),
+ deltaManifestList.getLeft(),
+ deltaManifestList.getRight(),
+ null,
null,
latestSnapshot.indexManifest(),
commitUser,
@@ -1403,11 +1409,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
private void cleanUpNoReuseTmpManifests(
- String baseManifestList,
+ Pair<String, Long> baseManifestList,
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergeAfterManifests) {
if (baseManifestList != null) {
- manifestList.delete(baseManifestList);
+ manifestList.delete(baseManifestList.getKey());
}
Set<String> oldMetaSet =
mergeBeforeManifests.stream()
@@ -1421,22 +1427,22 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
private void cleanUpReuseTmpManifests(
- String deltaManifestList,
- String changelogManifestList,
+ Pair<String, Long> deltaManifestList,
+ Pair<String, Long> changelogManifestList,
String oldIndexManifest,
String newIndexManifest) {
if (deltaManifestList != null) {
- for (ManifestFileMeta manifest :
manifestList.read(deltaManifestList)) {
+ for (ManifestFileMeta manifest :
manifestList.read(deltaManifestList.getKey())) {
manifestFile.delete(manifest.fileName());
}
- manifestList.delete(deltaManifestList);
+ manifestList.delete(deltaManifestList.getKey());
}
if (changelogManifestList != null) {
- for (ManifestFileMeta manifest :
manifestList.read(changelogManifestList)) {
+ for (ManifestFileMeta manifest :
manifestList.read(changelogManifestList.getKey())) {
manifestFile.delete(manifest.fileName());
}
- manifestList.delete(changelogManifestList);
+ manifestList.delete(changelogManifestList.getKey());
}
cleanIndexManifest(oldIndexManifest, newIndexManifest);
@@ -1518,8 +1524,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private class RetryResult implements CommitResult {
private final List<PartitionEntry> deltaStatistics;
- private final String deltaManifestList;
- private final String changelogManifestList;
+ private final Pair<String, Long> deltaManifestList;
+ private final Pair<String, Long> changelogManifestList;
private final String oldIndexManifest;
private final String newIndexManifest;
@@ -1529,8 +1535,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private RetryResult(
List<PartitionEntry> deltaStatistics,
- String deltaManifestList,
- String changelogManifestList,
+ Pair<String, Long> deltaManifestList,
+ Pair<String, Long> changelogManifestList,
String oldIndexManifest,
String newIndexManifest,
Snapshot latestSnapshot,
@@ -1557,14 +1563,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private class ManifestCompactResult implements CommitResult {
- private final String baseManifestList;
- private final String deltaManifestList;
+ private final Pair<String, Long> baseManifestList;
+ private final Pair<String, Long> deltaManifestList;
private final List<ManifestFileMeta> mergeBeforeManifests;
private final List<ManifestFileMeta> mergeAfterManifests;
public ManifestCompactResult(
- String baseManifestList,
- String deltaManifestList,
+ Pair<String, Long> baseManifestList,
+ Pair<String, Long> deltaManifestList,
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergeAfterManifests) {
this.baseManifestList = baseManifestList;
@@ -1574,7 +1580,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
public void cleanAll() {
- manifestList.delete(deltaManifestList);
+ manifestList.delete(deltaManifestList.getKey());
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
index 53641a2eb6..9280f0006f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -62,8 +62,12 @@ public class Tag extends Snapshot {
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
+ @JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE) @Nullable Long
baseManifestListSize,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
+ @JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE) @Nullable Long
deltaManifestListSize,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String
changelogManifestList,
+ @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE) @Nullable
+ Long changelogManifestListSize,
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@@ -82,8 +86,11 @@ public class Tag extends Snapshot {
id,
schemaId,
baseManifestList,
+ baseManifestListSize,
deltaManifestList,
+ deltaManifestListSize,
changelogManifestList,
+ changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
@@ -121,8 +128,11 @@ public class Tag extends Snapshot {
snapshot.id(),
snapshot.schemaId(),
snapshot.baseManifestList(),
+ snapshot.baseManifestListSize(),
snapshot.deltaManifestList(),
+ snapshot.deltaManifestListSize(),
snapshot.changelogManifestList(),
+ snapshot.changelogManifestListSize(),
snapshot.indexManifest(),
snapshot.commitUser(),
snapshot.commitIdentifier(),
@@ -144,8 +154,11 @@ public class Tag extends Snapshot {
id,
schemaId,
baseManifestList,
+ baseManifestListSize,
deltaManifestList,
+ deltaManifestListSize,
changelogManifestList,
+ changelogManifestListSize,
indexManifest,
commitUser,
commitIdentifier,
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 39cf9e4b98..54ae78def2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -154,20 +154,25 @@ public class ObjectsFile<T> implements
SimpleFileReader<T> {
}
public String writeWithoutRolling(Collection<T> records) {
- return writeWithoutRolling(records.iterator());
+ return writeWithoutRolling(records.iterator()).getKey();
}
- public String writeWithoutRolling(Iterator<T> records) {
+ protected Pair<String, Long> writeWithoutRolling(Iterator<T> records) {
Path path = pathFactory.newPath();
try {
- try (PositionOutputStream out = fileIO.newOutputStream(path,
false)) {
+ PositionOutputStream out = fileIO.newOutputStream(path, false);
+ long pos;
+ try {
try (FormatWriter writer = writerFactory.create(out,
compression)) {
while (records.hasNext()) {
writer.addElement(serializer.toRow(records.next()));
}
}
+ } finally {
+ pos = out.getPos();
+ out.close();
}
- return path.getName();
+ return Pair.of(path.getName(), pos);
} catch (Throwable e) {
fileIO.deleteQuietly(path);
throw new RuntimeException(
diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
index 1cceeffbfa..97533039e0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java
@@ -72,6 +72,9 @@ public class SnapshotTest {
+ " \"unknownKey\" : 22222\n"
+ "}";
Snapshot snapshot = Snapshot.fromJson(json);
+ assertThat(snapshot.baseManifestListSize).isEqualTo(6);
+ assertThat(snapshot.deltaManifestListSize).isEqualTo(8);
+ assertThat(snapshot.changelogManifestListSize).isEqualTo(10);
assertThat(Snapshot.fromJson(snapshot.toJson())).isEqualTo(snapshot);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index f8b69a04b0..a6aaf3530f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -53,7 +53,7 @@ public class ManifestListTest {
List<ManifestFileMeta> metas = generateData();
ManifestList manifestList = createManifestList(tempDir.toString());
- String manifestListName = manifestList.write(metas);
+ String manifestListName = manifestList.write(metas).getKey();
List<ManifestFileMeta> actualMetas =
manifestList.read(manifestListName);
assertThat(actualMetas).isEqualTo(metas);
}
@@ -80,7 +80,7 @@ public class ManifestListTest {
List<ManifestFileMeta> metas = generateData();
ManifestList manifestList = createManifestList(tempDir.toString());
- String manifestListName = manifestList.write(metas);
+ String manifestListName = manifestList.write(metas).getKey();
assertThat(manifestListName.startsWith("manifest-list-")).isTrue();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 5c2e43b13b..2bb472b128 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -630,7 +630,7 @@ public class FileDeletionTest {
List<ManifestFileMeta> newManifests =
Arrays.asList(newAddManifests.get(0),
newDeleteManifests.get(0));
- String newManifestListName = manifestList.write(newManifests);
+ String newManifestListName = manifestList.write(newManifests).getKey();
fileIO.rename(
pathFactory.toManifestListPath(newManifestListName),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index 407e42d5af..5acd26cf10 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -396,6 +396,9 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
1000,
@@ -421,6 +424,9 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
1000,
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
index 3198366dd3..e877463fab 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java
@@ -38,6 +38,9 @@ public class TagTest {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
1000,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 8336804edc..02a5f499b1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -252,6 +252,9 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
millis,
@@ -272,6 +275,9 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
millis,
@@ -293,6 +299,9 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
millis,
@@ -319,6 +328,9 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
+ null,
+ null,
"lastCommitUser",
0L,
Snapshot.CommitKind.APPEND,
@@ -369,6 +381,9 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
+ null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
i * 1000,