This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 29c0ef6d61 [core] Parallelize manifest full compaction reads (#7778)
29c0ef6d61 is described below
commit 29c0ef6d61577ce82df3647304f93345c8e90188
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 7 20:27:43 2026 +0800
[core] Parallelize manifest full compaction reads (#7778)
---
.../paimon/operation/ManifestFileMerger.java | 69 +++++++----
.../paimon/manifest/ManifestFileMetaTest.java | 126 +++++++++++++++++++++
.../paimon/manifest/ManifestFileMetaTestBase.java | 5 +
3 files changed, 181 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index fd8321de04..cdcad1ed3e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -21,7 +21,6 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry;
-import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -43,7 +42,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Util for merging manifest files. */
@@ -239,27 +241,19 @@ public class ManifestFileMerger {
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
manifestFile.createRollingWriter();
+ Function<ManifestFileMeta, List<FullCompactionReadResult>> reader =
+ file ->
+ singletonList(
+ readForFullCompaction(
+ file, manifestFile, mustChange,
deleteEntries));
Exception exception = null;
try {
- for (ManifestFileMeta file : toBeMerged) {
- List<ManifestEntry> entries = new ArrayList<>();
- boolean requireChange = mustChange.test(file);
- for (ManifestEntry entry : manifestFile.read(file.fileName(),
file.fileSize())) {
- if (entry.kind() == FileKind.DELETE) {
- continue;
- }
-
- if (deleteEntries.contains(entry.identifier())) {
- requireChange = true;
- } else {
- entries.add(entry);
- }
- }
-
- if (requireChange) {
- writer.write(entries);
+ for (FullCompactionReadResult readResult :
+ sequentialBatchedExecute(reader, toBeMerged,
manifestReadParallelism)) {
+ if (readResult.requireChange) {
+ writer.write(readResult.entries);
} else {
- result.add(file);
+ result.add(readResult.file);
}
}
} catch (Exception e) {
@@ -278,6 +272,29 @@ public class ManifestFileMerger {
return Optional.of(result);
}
+ private static FullCompactionReadResult readForFullCompaction(
+ ManifestFileMeta file,
+ ManifestFile manifestFile,
+ Filter<ManifestFileMeta> mustChange,
+ Set<FileEntry.Identifier> deleteEntries) {
+ List<ManifestEntry> entries = new ArrayList<>();
+ boolean requireChange = mustChange.test(file);
+ for (ManifestEntry entry :
+ manifestFile.read(
+ file.fileName(),
+ file.fileSize(),
+ FileEntry.addFilter(),
+ Filter.alwaysTrue())) {
+ if (deleteEntries.contains(entry.identifier())) {
+ requireChange = true;
+ } else {
+ entries.add(entry);
+ }
+ }
+
+ return new FullCompactionReadResult(file, requireChange, entries);
+ }
+
private static Set<BinaryRow>
computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
Set<BinaryRow> partitions = new HashSet<>();
for (FileEntry.Identifier identifier : deleteEntries) {
@@ -285,4 +302,18 @@ public class ManifestFileMerger {
}
return partitions;
}
+
+ private static class FullCompactionReadResult {
+
+ private final ManifestFileMeta file;
+ private final boolean requireChange;
+ private final List<ManifestEntry> entries;
+
+ private FullCompactionReadResult(
+ ManifestFileMeta file, boolean requireChange,
List<ManifestEntry> entries) {
+ this.file = file;
+ this.requireChange = requireChange;
+ this.entries = entries;
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 4dee7f0372..36b0d15f11 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -20,6 +20,8 @@ package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.SeekableInputStreamWrapper;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.ManifestFileMerger;
import org.apache.paimon.partition.PartitionPredicate;
@@ -49,7 +51,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -495,6 +501,38 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
.collect(Collectors.toList()));
}
+ @Test
+ public void testFullCompactionReadManifestsInParallel() throws Exception {
+ BlockingReadFileIO fileIO = new BlockingReadFileIO();
+ manifestFile = createManifestFile(tempDir.toString(), fileIO);
+
+ List<ManifestFileMeta> input = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ input.add(makeManifest(makeEntry(true, "parallel-" + i)));
+ }
+
+ List<ManifestFileMeta> newMetas = new ArrayList<>();
+ Optional<List<ManifestFileMeta>> fullCompacted;
+ fileIO.blockManifestReads();
+ try {
+ fullCompacted =
+ ManifestFileMerger.tryFullCompaction(
+ input,
+ newMetas,
+ manifestFile,
+ Long.MAX_VALUE,
+ 1,
+ getPartitionType(),
+ 2);
+ } finally {
+ fileIO.stopBlockingManifestReads();
+ }
+
+
assertThat(fileIO.maxConcurrentManifestReads()).isGreaterThanOrEqualTo(2);
+ assertThat(fullCompacted).isPresent();
+ assertEquivalentEntries(input, fullCompacted.get());
+ }
+
@RepeatedTest(10)
public void testRandomFullCompaction() throws Exception {
List<ManifestFileMeta> input = new ArrayList<>();
@@ -693,4 +731,92 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
ManifestFile getManifestFile() {
return manifestFile;
}
+
+ private static class BlockingReadFileIO extends LocalFileIO {
+
+ private final AtomicBoolean blockManifestReads = new
AtomicBoolean(false);
+ private final AtomicInteger activeManifestReads = new AtomicInteger(0);
+ private final AtomicInteger maxConcurrentManifestReads = new
AtomicInteger(0);
+ private final CountDownLatch readersReady = new CountDownLatch(2);
+ private final CountDownLatch releaseReaders = new CountDownLatch(1);
+
+ private void blockManifestReads() {
+ blockManifestReads.set(true);
+ }
+
+ private void stopBlockingManifestReads() {
+ blockManifestReads.set(false);
+ releaseReaders.countDown();
+ }
+
+ private int maxConcurrentManifestReads() {
+ return maxConcurrentManifestReads.get();
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws
IOException {
+ SeekableInputStream inputStream = super.newInputStream(path);
+ if (!blockManifestReads.get() ||
!path.toString().contains("/manifest/")) {
+ return inputStream;
+ }
+ return new BlockingSeekableInputStream(inputStream);
+ }
+
+ private class BlockingSeekableInputStream extends
SeekableInputStreamWrapper {
+
+ private boolean entered;
+ private boolean closed;
+
+ private BlockingSeekableInputStream(SeekableInputStream
inputStream) {
+ super(inputStream);
+ }
+
+ @Override
+ public int read() throws IOException {
+ beforeFirstRead();
+ return super.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ beforeFirstRead();
+ return super.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ if (entered && !closed) {
+ activeManifestReads.decrementAndGet();
+ }
+ closed = true;
+ }
+ }
+
+ private void beforeFirstRead() throws IOException {
+ if (entered) {
+ return;
+ }
+
+ entered = true;
+ int activeReads = activeManifestReads.incrementAndGet();
+ maxConcurrentManifestReads.accumulateAndGet(activeReads,
Math::max);
+ readersReady.countDown();
+ if (readersReady.getCount() == 0) {
+ releaseReaders.countDown();
+ }
+
+ try {
+ if (!releaseReaders.await(3, TimeUnit.SECONDS)) {
+ throw new IOException("Manifest reads were not
parallelized.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index a75412c6fb..b336bd8dd5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -137,6 +137,11 @@ public abstract class ManifestFileMetaTestBase {
protected ManifestFile createManifestFile(String pathStr) {
Path path = new Path(pathStr);
FileIO fileIO = FileIOFinder.find(path);
+ return createManifestFile(pathStr, fileIO);
+ }
+
+ protected ManifestFile createManifestFile(String pathStr, FileIO fileIO) {
+ Path path = new Path(pathStr);
return new ManifestFile.Factory(
fileIO,
new SchemaManager(fileIO, path),