This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f7372b5bb [core] Committer operator memory optimization (#3592)
f7372b5bb is described below
commit f7372b5bbfed4181ca373d68b58104335614eab1
Author: DBG <[email protected]>
AuthorDate: Tue Jun 25 14:15:57 2024 +0800
[core] Committer operator memory optimization (#3592)
---
.../java/org/apache/paimon/manifest/FileEntry.java | 9 ++++++++
.../apache/paimon/manifest/ManifestFileMeta.java | 24 ++++++++++++++++++----
2 files changed, 29 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index e0a6d25b7..bafc1878d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -176,6 +176,15 @@ public interface FileEntry {
return result;
}
+ static Future<List<ManifestEntry>> readManifestEntry(
+ ManifestFile manifestFile, ManifestFileMeta file) {
+ Future<List<ManifestEntry>> future =
+ CompletableFuture.supplyAsync(
+ () -> manifestFile.read(file.fileName(),
file.fileSize()),
+ FileUtils.COMMON_IO_FORK_JOIN_POOL);
+ return future;
+ }
+
static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
for (T entry : entries) {
Preconditions.checkState(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 2dc091d88..c092c36d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -43,7 +43,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Supplier;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -351,16 +351,32 @@ public class ManifestFileMeta {
for (ManifestEntry entry : mergedEntries) {
writer.write(entry);
}
+ mergedEntries.clear();
// 2.3.2 merge base files
- for (Supplier<List<ManifestEntry>> reader :
- FileEntry.readManifestEntries(manifestFile,
base.subList(j, base.size()))) {
- for (ManifestEntry entry : reader.get()) {
+ List<ManifestEntry> asyncManifestEntries = null;
+ for (; j < base.size(); j++) {
+ Future<List<ManifestEntry>> reader =
+ FileEntry.readManifestEntry(manifestFile, base.get(j));
+ if (asyncManifestEntries != null) {
+ for (ManifestEntry entry : asyncManifestEntries) {
+ checkArgument(entry.kind() == FileKind.ADD);
+ if (!deleteEntries.contains(entry.identifier())) {
+ writer.write(entry);
+ }
+ }
+ }
+ asyncManifestEntries = reader.get();
+ }
+
+ if (asyncManifestEntries != null) {
+ for (ManifestEntry entry : asyncManifestEntries) {
checkArgument(entry.kind() == FileKind.ADD);
if (!deleteEntries.contains(entry.identifier())) {
writer.write(entry);
}
}
+ asyncManifestEntries.clear();
}
// 2.3.3 merge deltaMerged