This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a51f58008 [core] Optimize full compaction for manifest entries (#2963)
a51f58008 is described below

commit a51f580082f3c00894712f0a6b65b8b2c4319f29
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 7 22:07:36 2024 +0800

    [core] Optimize full compaction for manifest entries (#2963)
---
 .../java/org/apache/paimon/manifest/FileEntry.java | 45 ++++++++------
 .../org/apache/paimon/manifest/ManifestFile.java   | 19 +++---
 .../apache/paimon/manifest/ManifestFileMeta.java   | 69 ++++++++++++++++------
 .../paimon/manifest/ManifestFileMetaTest.java      |  4 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  1 +
 5 files changed, 93 insertions(+), 45 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 1b7b2c8bb..46f36be7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -23,6 +23,7 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -30,7 +31,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
 
 /** Entry representing a file. */
 public interface FileEntry {
@@ -117,23 +119,10 @@ public interface FileEntry {
             ManifestFile manifestFile,
             List<ManifestFileMeta> manifestFiles,
             Map<Identifier, ManifestEntry> map) {
-        List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
-                manifestFiles.stream()
-                        .map(
-                                manifestFileMeta ->
-                                        CompletableFuture.supplyAsync(
-                                                () ->
-                                                        manifestFile.read(
-                                                                
manifestFileMeta.fileName()),
-                                                
FileUtils.COMMON_IO_FORK_JOIN_POOL))
-                        .collect(Collectors.toList());
-
-        try {
-            for (CompletableFuture<List<ManifestEntry>> taskResult : 
manifestReadFutures) {
-                mergeEntries(taskResult.get(), map);
-            }
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException("Failed to read manifest file.", e);
+        List<Supplier<List<ManifestEntry>>> manifestReadFutures =
+                readManifestEntries(manifestFile, manifestFiles);
+        for (Supplier<List<ManifestEntry>> taskResult : manifestReadFutures) {
+            mergeEntries(taskResult.get(), map);
         }
     }
 
@@ -167,6 +156,26 @@ public interface FileEntry {
         }
     }
 
+    static List<Supplier<List<ManifestEntry>>> readManifestEntries(
+            ManifestFile manifestFile, List<ManifestFileMeta> manifestFiles) {
+        List<Supplier<List<ManifestEntry>>> result = new ArrayList<>();
+        for (ManifestFileMeta file : manifestFiles) {
+            Future<List<ManifestEntry>> future =
+                    CompletableFuture.supplyAsync(
+                            () -> manifestFile.read(file.fileName()),
+                            FileUtils.COMMON_IO_FORK_JOIN_POOL);
+            result.add(
+                    () -> {
+                        try {
+                            return future.get();
+                        } catch (ExecutionException | InterruptedException e) {
+                            throw new RuntimeException("Failed to read 
manifest file.", e);
+                        }
+                    });
+        }
+        return result;
+    }
+
     static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
         for (T entry : entries) {
             Preconditions.checkState(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index a434e2acd..4aa6c2931 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -81,14 +81,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> 
{
      * <p>NOTE: This method is atomic.
      */
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
-        RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
-                new RollingFileWriter<>(
-                        () ->
-                                new ManifestEntryWriter(
-                                        writerFactory,
-                                        pathFactory.newPath(),
-                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
-                        suggestedFileSize);
+        RollingFileWriter<ManifestEntry, ManifestFileMeta> writer = 
createRollingWriter();
         try {
             writer.write(entries);
             writer.close();
@@ -98,6 +91,16 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> 
{
         return writer.result();
     }
 
+    public RollingFileWriter<ManifestEntry, ManifestFileMeta> 
createRollingWriter() {
+        return new RollingFileWriter<>(
+                () ->
+                        new ManifestEntryWriter(
+                                writerFactory,
+                                pathFactory.newPath(),
+                                CoreOptions.FILE_COMPRESSION.defaultValue()),
+                suggestedFileSize);
+    }
+
     private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, 
ManifestFileMeta> {
 
         private final TableStatsCollector partitionStatsCollector;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index beecb4482..c0bcdd061 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.manifest.FileEntry.Identifier;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -28,6 +29,7 @@ import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import org.slf4j.Logger;
@@ -41,8 +43,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Metadata of a manifest file. */
 public class ManifestFileMeta {
 
@@ -170,7 +175,7 @@ public class ManifestFileMeta {
             for (ManifestFileMeta manifest : newMetas) {
                 manifestFile.delete(manifest.fileName);
             }
-            throw e;
+            throw new RuntimeException(e);
         }
     }
 
@@ -229,7 +234,8 @@ public class ManifestFileMeta {
             ManifestFile manifestFile,
             long suggestedMetaSize,
             long sizeTrigger,
-            RowType partitionType) {
+            RowType partitionType)
+            throws Exception {
         // 1. should trigger full compaction
 
         List<ManifestFileMeta> base = new ArrayList<>();
@@ -311,15 +317,16 @@ public class ManifestFileMeta {
                     }
                 });
 
-        Map<Identifier, ManifestEntry> fullMerged = new LinkedHashMap<>();
+        List<ManifestEntry> mergedEntries = new ArrayList<>();
         for (; j < base.size(); j++) {
             ManifestFileMeta file = base.get(j);
-            FileEntry.mergeEntries(manifestFile.read(file.fileName), 
fullMerged);
             boolean contains = false;
-            for (Identifier identifier : deleteEntries) {
-                if (fullMerged.containsKey(identifier)) {
+            for (ManifestEntry entry : manifestFile.read(file.fileName)) {
+                checkArgument(entry.kind() == FileKind.ADD);
+                if (deleteEntries.contains(entry.identifier())) {
                     contains = true;
-                    break;
+                } else {
+                    mergedEntries.add(entry);
                 }
             }
             if (contains) {
@@ -327,25 +334,53 @@ public class ManifestFileMeta {
                 j++;
                 break;
             } else {
-                fullMerged.clear();
+                mergedEntries.clear();
                 result.add(file);
             }
         }
 
-        // 2.3. merge base files
+        // 2.3. merge
 
-        FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()), 
fullMerged);
-        FileEntry.mergeEntries(deltaMerged.values(), fullMerged);
+        RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
+                manifestFile.createRollingWriter();
+        Exception exception = null;
+        try {
 
-        // 2.4. write new manifest files
+            // 2.3.1 merge mergedEntries
+            for (ManifestEntry entry : mergedEntries) {
+                writer.write(entry);
+            }
 
-        if (!fullMerged.isEmpty()) {
-            List<ManifestFileMeta> merged =
-                    manifestFile.write(new ArrayList<>(fullMerged.values()));
-            result.addAll(merged);
-            newMetas.addAll(merged);
+            // 2.3.2 merge base files
+            for (Supplier<List<ManifestEntry>> reader :
+                    FileEntry.readManifestEntries(manifestFile, 
base.subList(j, base.size()))) {
+                for (ManifestEntry entry : reader.get()) {
+                    checkArgument(entry.kind() == FileKind.ADD);
+                    if (!deleteEntries.contains(entry.identifier())) {
+                        writer.write(entry);
+                    }
+                }
+            }
+
+            // 2.3.3 merge deltaMerged
+            for (ManifestEntry entry : deltaMerged.values()) {
+                if (entry.kind() == FileKind.ADD) {
+                    writer.write(entry);
+                }
+            }
+        } catch (Exception e) {
+            exception = e;
+        } finally {
+            if (exception != null) {
+                IOUtils.closeQuietly(writer);
+                throw exception;
+            }
+            writer.close();
         }
 
+        List<ManifestFileMeta> merged = writer.result();
+        result.addAll(merged);
+        newMetas.addAll(merged);
         return Optional.of(result);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 7ff95be36..c08543784 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -210,7 +210,7 @@ public class ManifestFileMetaTest extends 
ManifestFileMetaTestBase {
     }
 
     @Test
-    public void testTriggerFullCompaction() {
+    public void testTriggerFullCompaction() throws Exception {
         List<ManifestEntry> entries = new ArrayList<>();
         for (int i = 0; i < 16; i++) {
             entries.add(makeEntry(true, String.valueOf(i)));
@@ -260,7 +260,7 @@ public class ManifestFileMetaTest extends 
ManifestFileMetaTestBase {
     }
 
     @Test
-    public void testMultiPartitionsFullCompaction() {
+    public void testMultiPartitionsFullCompaction() throws Exception {
 
         List<ManifestFileMeta> input = createBaseManifestFileMetas(true);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 611876867..3a9754dec 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -107,6 +107,7 @@ public abstract class ManifestFileMetaTestBase {
                         .collect(Collectors.toList());
         List<String> entryBeforeMerge =
                 FileEntry.mergeEntries(inputEntry).stream()
+                        .filter(entry -> entry.kind() == FileKind.ADD)
                         .map(entry -> entry.kind() + "-" + 
entry.file().fileName())
                         .collect(Collectors.toList());
 

Reply via email to