This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c0ef6d61 [core] Parallelize manifest full compaction reads (#7778)
29c0ef6d61 is described below

commit 29c0ef6d61577ce82df3647304f93345c8e90188
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 7 20:27:43 2026 +0800

    [core] Parallelize manifest full compaction reads (#7778)
---
 .../paimon/operation/ManifestFileMerger.java       |  69 +++++++----
 .../paimon/manifest/ManifestFileMetaTest.java      | 126 +++++++++++++++++++++
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   5 +
 3 files changed, 181 insertions(+), 19 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index fd8321de04..cdcad1ed3e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -21,7 +21,6 @@ package org.apache.paimon.operation;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.manifest.FileEntry;
-import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -43,7 +42,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Util for merging manifest files. */
@@ -239,27 +241,19 @@ public class ManifestFileMerger {
 
         RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
                 manifestFile.createRollingWriter();
+        Function<ManifestFileMeta, List<FullCompactionReadResult>> reader =
+                file ->
+                        singletonList(
+                                readForFullCompaction(
+                                        file, manifestFile, mustChange, 
deleteEntries));
         Exception exception = null;
         try {
-            for (ManifestFileMeta file : toBeMerged) {
-                List<ManifestEntry> entries = new ArrayList<>();
-                boolean requireChange = mustChange.test(file);
-                for (ManifestEntry entry : manifestFile.read(file.fileName(), 
file.fileSize())) {
-                    if (entry.kind() == FileKind.DELETE) {
-                        continue;
-                    }
-
-                    if (deleteEntries.contains(entry.identifier())) {
-                        requireChange = true;
-                    } else {
-                        entries.add(entry);
-                    }
-                }
-
-                if (requireChange) {
-                    writer.write(entries);
+            for (FullCompactionReadResult readResult :
+                    sequentialBatchedExecute(reader, toBeMerged, 
manifestReadParallelism)) {
+                if (readResult.requireChange) {
+                    writer.write(readResult.entries);
                 } else {
-                    result.add(file);
+                    result.add(readResult.file);
                 }
             }
         } catch (Exception e) {
@@ -278,6 +272,29 @@ public class ManifestFileMerger {
         return Optional.of(result);
     }
 
+    private static FullCompactionReadResult readForFullCompaction(
+            ManifestFileMeta file,
+            ManifestFile manifestFile,
+            Filter<ManifestFileMeta> mustChange,
+            Set<FileEntry.Identifier> deleteEntries) {
+        List<ManifestEntry> entries = new ArrayList<>();
+        boolean requireChange = mustChange.test(file);
+        for (ManifestEntry entry :
+                manifestFile.read(
+                        file.fileName(),
+                        file.fileSize(),
+                        FileEntry.addFilter(),
+                        Filter.alwaysTrue())) {
+            if (deleteEntries.contains(entry.identifier())) {
+                requireChange = true;
+            } else {
+                entries.add(entry);
+            }
+        }
+
+        return new FullCompactionReadResult(file, requireChange, entries);
+    }
+
     private static Set<BinaryRow> 
computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
         Set<BinaryRow> partitions = new HashSet<>();
         for (FileEntry.Identifier identifier : deleteEntries) {
@@ -285,4 +302,18 @@ public class ManifestFileMerger {
         }
         return partitions;
     }
+
+    private static class FullCompactionReadResult {
+
+        private final ManifestFileMeta file;
+        private final boolean requireChange;
+        private final List<ManifestEntry> entries;
+
+        private FullCompactionReadResult(
+                ManifestFileMeta file, boolean requireChange, 
List<ManifestEntry> entries) {
+            this.file = file;
+            this.requireChange = requireChange;
+            this.entries = entries;
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 4dee7f0372..36b0d15f11 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -20,6 +20,8 @@ package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.SeekableInputStreamWrapper;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.operation.ManifestFileMerger;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -49,7 +51,11 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -495,6 +501,38 @@ public class ManifestFileMetaTest extends 
ManifestFileMetaTestBase {
                         .collect(Collectors.toList()));
     }
 
+    @Test
+    public void testFullCompactionReadManifestsInParallel() throws Exception {
+        BlockingReadFileIO fileIO = new BlockingReadFileIO();
+        manifestFile = createManifestFile(tempDir.toString(), fileIO);
+
+        List<ManifestFileMeta> input = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            input.add(makeManifest(makeEntry(true, "parallel-" + i)));
+        }
+
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        Optional<List<ManifestFileMeta>> fullCompacted;
+        fileIO.blockManifestReads();
+        try {
+            fullCompacted =
+                    ManifestFileMerger.tryFullCompaction(
+                            input,
+                            newMetas,
+                            manifestFile,
+                            Long.MAX_VALUE,
+                            1,
+                            getPartitionType(),
+                            2);
+        } finally {
+            fileIO.stopBlockingManifestReads();
+        }
+
+        
assertThat(fileIO.maxConcurrentManifestReads()).isGreaterThanOrEqualTo(2);
+        assertThat(fullCompacted).isPresent();
+        assertEquivalentEntries(input, fullCompacted.get());
+    }
+
     @RepeatedTest(10)
     public void testRandomFullCompaction() throws Exception {
         List<ManifestFileMeta> input = new ArrayList<>();
@@ -693,4 +731,92 @@ public class ManifestFileMetaTest extends 
ManifestFileMetaTestBase {
     ManifestFile getManifestFile() {
         return manifestFile;
     }
+
+    private static class BlockingReadFileIO extends LocalFileIO {
+
+        private final AtomicBoolean blockManifestReads = new 
AtomicBoolean(false);
+        private final AtomicInteger activeManifestReads = new AtomicInteger(0);
+        private final AtomicInteger maxConcurrentManifestReads = new 
AtomicInteger(0);
+        private final CountDownLatch readersReady = new CountDownLatch(2);
+        private final CountDownLatch releaseReaders = new CountDownLatch(1);
+
+        private void blockManifestReads() {
+            blockManifestReads.set(true);
+        }
+
+        private void stopBlockingManifestReads() {
+            blockManifestReads.set(false);
+            releaseReaders.countDown();
+        }
+
+        private int maxConcurrentManifestReads() {
+            return maxConcurrentManifestReads.get();
+        }
+
+        @Override
+        public SeekableInputStream newInputStream(Path path) throws 
IOException {
+            SeekableInputStream inputStream = super.newInputStream(path);
+            if (!blockManifestReads.get() || 
!path.toString().contains("/manifest/")) {
+                return inputStream;
+            }
+            return new BlockingSeekableInputStream(inputStream);
+        }
+
+        private class BlockingSeekableInputStream extends 
SeekableInputStreamWrapper {
+
+            private boolean entered;
+            private boolean closed;
+
+            private BlockingSeekableInputStream(SeekableInputStream 
inputStream) {
+                super(inputStream);
+            }
+
+            @Override
+            public int read() throws IOException {
+                beforeFirstRead();
+                return super.read();
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                beforeFirstRead();
+                return super.read(b, off, len);
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    super.close();
+                } finally {
+                    if (entered && !closed) {
+                        activeManifestReads.decrementAndGet();
+                    }
+                    closed = true;
+                }
+            }
+
+            private void beforeFirstRead() throws IOException {
+                if (entered) {
+                    return;
+                }
+
+                entered = true;
+                int activeReads = activeManifestReads.incrementAndGet();
+                maxConcurrentManifestReads.accumulateAndGet(activeReads, 
Math::max);
+                readersReady.countDown();
+                if (readersReady.getCount() == 0) {
+                    releaseReaders.countDown();
+                }
+
+                try {
+                    if (!releaseReaders.await(3, TimeUnit.SECONDS)) {
+                        throw new IOException("Manifest reads were not 
parallelized.");
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(e);
+                }
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index a75412c6fb..b336bd8dd5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -137,6 +137,11 @@ public abstract class ManifestFileMetaTestBase {
     protected ManifestFile createManifestFile(String pathStr) {
         Path path = new Path(pathStr);
         FileIO fileIO = FileIOFinder.find(path);
+        return createManifestFile(pathStr, fileIO);
+    }
+
+    protected ManifestFile createManifestFile(String pathStr, FileIO fileIO) {
+        Path path = new Path(pathStr);
         return new ManifestFile.Factory(
                         fileIO,
                         new SchemaManager(fileIO, path),

Reply via email to