This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e6a7614da [core] Supports asynchronous manifest merge (#2391)
e6a7614da is described below
commit e6a7614da9107af466062c08b3aaca60ba264f0a
Author: Liwei Li <[email protected]>
AuthorDate: Mon Nov 27 10:24:55 2023 +0800
[core] Supports asynchronous manifest merge (#2391)
---
.../org/apache/paimon/manifest/ManifestEntry.java | 28 ++++++++++++++++++++++
.../apache/paimon/manifest/ManifestFileMeta.java | 13 +++-------
2 files changed, 31 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index bf62cf795..4ca944ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
@@ -33,6 +34,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
@@ -118,6 +122,30 @@ public class ManifestEntry {
return map.values();
}
+ public static void mergeEntries(
+ ManifestFile manifestFile,
+ List<ManifestFileMeta> manifestFiles,
+ Map<Identifier, ManifestEntry> map) {
+ List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
+ manifestFiles.stream()
+ .map(
+ manifestFileMeta ->
+ CompletableFuture.supplyAsync(
+ () ->
+ manifestFile.read(
+
manifestFileMeta.fileName()),
+
FileUtils.COMMON_IO_FORK_JOIN_POOL))
+ .collect(Collectors.toList());
+
+ try {
+ for (CompletableFuture<List<ManifestEntry>> taskResult :
manifestReadFutures) {
+ mergeEntries(taskResult.get(), map);
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException("Failed to read manifest file.", e);
+ }
+ }
+
public static void mergeEntries(
Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry>
map) {
for (ManifestEntry entry : entries) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index e4e7f707b..b917e27fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -215,9 +215,7 @@ public class ManifestFileMeta {
}
Map<Identifier, ManifestEntry> map = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : candidates) {
- ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName),
map);
- }
+ ManifestEntry.mergeEntries(manifestFile, candidates, map);
if (!map.isEmpty()) {
List<ManifestFileMeta> merged = manifestFile.write(new
ArrayList<>(map.values()));
result.addAll(merged);
@@ -272,9 +270,7 @@ public class ManifestFileMeta {
// 2.1. try to skip base files by partition filter
Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : delta) {
- ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName),
deltaMerged);
- }
+ ManifestEntry.mergeEntries(manifestFile, delta, deltaMerged);
List<ManifestFileMeta> result = new ArrayList<>();
int j = 0;
@@ -338,10 +334,7 @@ public class ManifestFileMeta {
// 2.3. merge base files
- for (; j < base.size(); j++) {
- ManifestFileMeta manifestFileMeta = base.get(j);
-
ManifestEntry.mergeEntries(manifestFile.read(manifestFileMeta.fileName),
fullMerged);
- }
+ ManifestEntry.mergeEntries(manifestFile, base.subList(j, base.size()),
fullMerged);
ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
// 2.4. write new manifest files