This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f7372b5bb [core] Committer operator memory optimization (#3592)
f7372b5bb is described below

commit f7372b5bbfed4181ca373d68b58104335614eab1
Author: DBG <[email protected]>
AuthorDate: Tue Jun 25 14:15:57 2024 +0800

    [core] Committer operator memory optimization (#3592)
---
 .../java/org/apache/paimon/manifest/FileEntry.java |  9 ++++++++
 .../apache/paimon/manifest/ManifestFileMeta.java   | 24 ++++++++++++++++++----
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index e0a6d25b7..bafc1878d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -176,6 +176,15 @@ public interface FileEntry {
         return result;
     }
 
+    static Future<List<ManifestEntry>> readManifestEntry(
+            ManifestFile manifestFile, ManifestFileMeta file) {
+        Future<List<ManifestEntry>> future =
+                CompletableFuture.supplyAsync(
+                        () -> manifestFile.read(file.fileName(), 
file.fileSize()),
+                        FileUtils.COMMON_IO_FORK_JOIN_POOL);
+        return future;
+    }
+
     static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
         for (T entry : entries) {
             Preconditions.checkState(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 2dc091d88..c092c36d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -43,7 +43,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Supplier;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -351,16 +351,32 @@ public class ManifestFileMeta {
             for (ManifestEntry entry : mergedEntries) {
                 writer.write(entry);
             }
+            mergedEntries.clear();
 
             // 2.3.2 merge base files
-            for (Supplier<List<ManifestEntry>> reader :
-                    FileEntry.readManifestEntries(manifestFile, 
base.subList(j, base.size()))) {
-                for (ManifestEntry entry : reader.get()) {
+            List<ManifestEntry> asyncManifestEntries = null;
+            for (; j < base.size(); j++) {
+                Future<List<ManifestEntry>> reader =
+                        FileEntry.readManifestEntry(manifestFile, base.get(j));
+                if (asyncManifestEntries != null) {
+                    for (ManifestEntry entry : asyncManifestEntries) {
+                        checkArgument(entry.kind() == FileKind.ADD);
+                        if (!deleteEntries.contains(entry.identifier())) {
+                            writer.write(entry);
+                        }
+                    }
+                }
+                asyncManifestEntries = reader.get();
+            }
+
+            if (asyncManifestEntries != null) {
+                for (ManifestEntry entry : asyncManifestEntries) {
                     checkArgument(entry.kind() == FileKind.ADD);
                     if (!deleteEntries.contains(entry.identifier())) {
                         writer.write(entry);
                     }
                 }
+                asyncManifestEntries.clear();
             }
 
             // 2.3.3 merge deltaMerged

Reply via email to