This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a51f58008 [core] Optimize full compaction for manifest entries (#2963)
a51f58008 is described below
commit a51f580082f3c00894712f0a6b65b8b2c4319f29
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 7 22:07:36 2024 +0800
[core] Optimize full compaction for manifest entries (#2963)
---
.../java/org/apache/paimon/manifest/FileEntry.java | 45 ++++++++------
.../org/apache/paimon/manifest/ManifestFile.java | 19 +++---
.../apache/paimon/manifest/ManifestFileMeta.java | 69 ++++++++++++++++------
.../paimon/manifest/ManifestFileMetaTest.java | 4 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 1 +
5 files changed, 93 insertions(+), 45 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 1b7b2c8bb..46f36be7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -23,6 +23,7 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
@@ -30,7 +31,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
/** Entry representing a file. */
public interface FileEntry {
@@ -117,23 +119,10 @@ public interface FileEntry {
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
Map<Identifier, ManifestEntry> map) {
- List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
- manifestFiles.stream()
- .map(
- manifestFileMeta ->
- CompletableFuture.supplyAsync(
- () ->
- manifestFile.read(
-
manifestFileMeta.fileName()),
-
FileUtils.COMMON_IO_FORK_JOIN_POOL))
- .collect(Collectors.toList());
-
- try {
- for (CompletableFuture<List<ManifestEntry>> taskResult :
manifestReadFutures) {
- mergeEntries(taskResult.get(), map);
- }
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException("Failed to read manifest file.", e);
+ List<Supplier<List<ManifestEntry>>> manifestReadFutures =
+ readManifestEntries(manifestFile, manifestFiles);
+ for (Supplier<List<ManifestEntry>> taskResult : manifestReadFutures) {
+ mergeEntries(taskResult.get(), map);
}
}
@@ -167,6 +156,26 @@ public interface FileEntry {
}
}
+ static List<Supplier<List<ManifestEntry>>> readManifestEntries(
+ ManifestFile manifestFile, List<ManifestFileMeta> manifestFiles) {
+ List<Supplier<List<ManifestEntry>>> result = new ArrayList<>();
+ for (ManifestFileMeta file : manifestFiles) {
+ Future<List<ManifestEntry>> future =
+ CompletableFuture.supplyAsync(
+ () -> manifestFile.read(file.fileName()),
+ FileUtils.COMMON_IO_FORK_JOIN_POOL);
+ result.add(
+ () -> {
+ try {
+ return future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException("Failed to read
manifest file.", e);
+ }
+ });
+ }
+ return result;
+ }
+
static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
for (T entry : entries) {
Preconditions.checkState(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index a434e2acd..4aa6c2931 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -81,14 +81,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry>
{
* <p>NOTE: This method is atomic.
*/
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
- RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
- new RollingFileWriter<>(
- () ->
- new ManifestEntryWriter(
- writerFactory,
- pathFactory.newPath(),
-
CoreOptions.FILE_COMPRESSION.defaultValue()),
- suggestedFileSize);
+ RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
createRollingWriter();
try {
writer.write(entries);
writer.close();
@@ -98,6 +91,16 @@ public class ManifestFile extends ObjectsFile<ManifestEntry>
{
return writer.result();
}
+ public RollingFileWriter<ManifestEntry, ManifestFileMeta>
createRollingWriter() {
+ return new RollingFileWriter<>(
+ () ->
+ new ManifestEntryWriter(
+ writerFactory,
+ pathFactory.newPath(),
+ CoreOptions.FILE_COMPRESSION.defaultValue()),
+ suggestedFileSize);
+ }
+
private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry,
ManifestFileMeta> {
private final TableStatsCollector partitionStatsCollector;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index beecb4482..c0bcdd061 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -19,6 +19,7 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -28,6 +29,7 @@ import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.slf4j.Logger;
@@ -41,8 +43,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Metadata of a manifest file. */
public class ManifestFileMeta {
@@ -170,7 +175,7 @@ public class ManifestFileMeta {
for (ManifestFileMeta manifest : newMetas) {
manifestFile.delete(manifest.fileName);
}
- throw e;
+ throw new RuntimeException(e);
}
}
@@ -229,7 +234,8 @@ public class ManifestFileMeta {
ManifestFile manifestFile,
long suggestedMetaSize,
long sizeTrigger,
- RowType partitionType) {
+ RowType partitionType)
+ throws Exception {
// 1. should trigger full compaction
List<ManifestFileMeta> base = new ArrayList<>();
@@ -311,15 +317,16 @@ public class ManifestFileMeta {
}
});
- Map<Identifier, ManifestEntry> fullMerged = new LinkedHashMap<>();
+ List<ManifestEntry> mergedEntries = new ArrayList<>();
for (; j < base.size(); j++) {
ManifestFileMeta file = base.get(j);
- FileEntry.mergeEntries(manifestFile.read(file.fileName),
fullMerged);
boolean contains = false;
- for (Identifier identifier : deleteEntries) {
- if (fullMerged.containsKey(identifier)) {
+ for (ManifestEntry entry : manifestFile.read(file.fileName)) {
+ checkArgument(entry.kind() == FileKind.ADD);
+ if (deleteEntries.contains(entry.identifier())) {
contains = true;
- break;
+ } else {
+ mergedEntries.add(entry);
}
}
if (contains) {
@@ -327,25 +334,53 @@ public class ManifestFileMeta {
j++;
break;
} else {
- fullMerged.clear();
+ mergedEntries.clear();
result.add(file);
}
}
- // 2.3. merge base files
+ // 2.3. merge
- FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()),
fullMerged);
- FileEntry.mergeEntries(deltaMerged.values(), fullMerged);
+ RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
+ manifestFile.createRollingWriter();
+ Exception exception = null;
+ try {
- // 2.4. write new manifest files
+ // 2.3.1 merge mergedEntries
+ for (ManifestEntry entry : mergedEntries) {
+ writer.write(entry);
+ }
- if (!fullMerged.isEmpty()) {
- List<ManifestFileMeta> merged =
- manifestFile.write(new ArrayList<>(fullMerged.values()));
- result.addAll(merged);
- newMetas.addAll(merged);
+ // 2.3.2 merge base files
+ for (Supplier<List<ManifestEntry>> reader :
+ FileEntry.readManifestEntries(manifestFile,
base.subList(j, base.size()))) {
+ for (ManifestEntry entry : reader.get()) {
+ checkArgument(entry.kind() == FileKind.ADD);
+ if (!deleteEntries.contains(entry.identifier())) {
+ writer.write(entry);
+ }
+ }
+ }
+
+ // 2.3.3 merge deltaMerged
+ for (ManifestEntry entry : deltaMerged.values()) {
+ if (entry.kind() == FileKind.ADD) {
+ writer.write(entry);
+ }
+ }
+ } catch (Exception e) {
+ exception = e;
+ } finally {
+ if (exception != null) {
+ IOUtils.closeQuietly(writer);
+ throw exception;
+ }
+ writer.close();
}
+ List<ManifestFileMeta> merged = writer.result();
+ result.addAll(merged);
+ newMetas.addAll(merged);
return Optional.of(result);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 7ff95be36..c08543784 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -210,7 +210,7 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
}
@Test
- public void testTriggerFullCompaction() {
+ public void testTriggerFullCompaction() throws Exception {
List<ManifestEntry> entries = new ArrayList<>();
for (int i = 0; i < 16; i++) {
entries.add(makeEntry(true, String.valueOf(i)));
@@ -260,7 +260,7 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
}
@Test
- public void testMultiPartitionsFullCompaction() {
+ public void testMultiPartitionsFullCompaction() throws Exception {
List<ManifestFileMeta> input = createBaseManifestFileMetas(true);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 611876867..3a9754dec 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -107,6 +107,7 @@ public abstract class ManifestFileMetaTestBase {
.collect(Collectors.toList());
List<String> entryBeforeMerge =
FileEntry.mergeEntries(inputEntry).stream()
+ .filter(entry -> entry.kind() == FileKind.ADD)
.map(entry -> entry.kind() + "-" +
entry.file().fileName())
.collect(Collectors.toList());