This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ba4d76da89 [core] Support manifest sort feature when commit (#7842)
ba4d76da89 is described below
commit ba4d76da89cfd4f2015479a3c5436901d9a281de
Author: umi <[email protected]>
AuthorDate: Thu Jun 4 13:54:46 2026 +0800
[core] Support manifest sort feature when commit (#7842)
---
docs/generated/core_configuration.html | 20 +-
.../main/java/org/apache/paimon/CoreOptions.java | 70 +-
.../paimon/operation/FileStoreCommitImpl.java | 20 +-
.../operation/ManifestAdjacentSortedRun.java | 102 ++
.../paimon/operation/ManifestFileMerger.java | 71 +-
.../paimon/operation/ManifestFileSorter.java | 1149 ++++++++++++++++++++
.../paimon/operation/ManifestPickStrategy.java | 149 +++
.../org/apache/paimon/schema/SchemaValidation.java | 20 +
.../paimon/manifest/ManifestFileMetaTest.java | 701 +++++++++++-
.../manifest/NoPartitionManifestFileMetaTest.java | 20 +-
.../apache/paimon/schema/SchemaValidationTest.java | 62 ++
11 files changed, 2326 insertions(+), 58 deletions(-)
diff --git a/docs/generated/core_configuration.html
b/docs/generated/core_configuration.html
index 8f7ea2eaae..1a66fb0ab0 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -925,7 +925,25 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td><h5>manifest.merge-min-count</h5></td>
<td style="word-wrap: break-word;">30</td>
<td>Integer</td>
- <td>To avoid frequent manifest merges, this parameter specifies
the minimum number of ManifestFileMeta to merge.</td>
+ <td>To avoid frequent manifest merges, this parameter specifies
the minimum number of ManifestFileMeta to merge.<br />Note: when
'manifest-sort.enabled' is true, this minimum-count gate is only applied to the
trailing sub-segment of a section that exceeds
'manifest-sort.max-rewrite-size'. Small under-budget sections are sorted and
rewritten directly, so two small manifest files may be merged into one even
when their count is below this threshold and full compaction is not tri [...]
+ </tr>
+ <tr>
+ <td><h5>manifest-sort.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to invoke manifest sort rewrite during commit.<br
/>Note: enabling this changes the semantics of 'manifest.merge-min-count'. In
the sort rewrite path, small manifest files within the rewrite budget are
sorted and merged directly, so the minimum-count gate no longer prevents
merging a small number of under-budget manifest files when full compaction is
not triggered.</td>
+ </tr>
+ <tr>
+ <td><h5>manifest-sort.partition-field</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Partition field name to sort manifest entries by. Validated by
schema validation, if not configured, defaults to the first partition
field.</td>
+ </tr>
+ <tr>
+ <td><h5>manifest-sort.max-rewrite-size</h5></td>
+ <td style="word-wrap: break-word;">256 mb</td>
+ <td>MemorySize</td>
+ <td>Maximum total size of manifest files to rewrite in a single
sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger
value to allow more aggressive sort rewriting. The cap only limits the sorted
rewrite portion and full/minor cleanup may still happen beyond it.</td>
</tr>
<tr>
<td><h5>manifest.target-file-size</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 03140e9ecc..da734fa938 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -468,8 +468,61 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(30)
.withDescription(
- "To avoid frequent manifest merges, this parameter
specifies the minimum number "
- + "of ManifestFileMeta to merge.");
+ Description.builder()
+ .text(
+ "To avoid frequent manifest
merges, this parameter specifies the minimum number "
+ + "of ManifestFileMeta to
merge.")
+ .linebreak()
+ .text(
+ "Note: when '"
+ + "manifest-sort.enabled"
+ + "' is true, this
minimum-count gate is only "
+ + "applied to the trailing
sub-segment of a "
+ + "section that exceeds '"
+ +
"manifest-sort.max-rewrite-size"
+ + "'. Small under-budget
sections are sorted "
+ + "and rewritten directly,
so two small manifest "
+ + "files may be merged
into one even when their "
+ + "count is below this
threshold and full "
+ + "compaction is not
triggered.")
+ .build());
+
+ public static final ConfigOption<Boolean> MANIFEST_SORT_ENABLED =
+ key("manifest-sort.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ Description.builder()
+ .text("Whether to invoke manifest sort
rewrite during commit.")
+ .linebreak()
+ .text(
+ "Note: enabling this changes the
semantics of '"
+ +
"manifest.merge-min-count"
+ + "'. In the sort rewrite
path, small manifest "
+ + "files within the
rewrite budget are sorted "
+ + "and merged directly, so
the minimum-count "
+ + "gate no longer prevents
merging a small "
+ + "number of under-budget
manifest files when "
+ + "full compaction is not
triggered.")
+ .build());
+
+ public static final ConfigOption<String> MANIFEST_SORT_PARTITION_FIELD =
+ key("manifest-sort.partition-field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Partition field name to sort manifest entries by.
Validated by"
+ + " schema validation, if not configured,
defaults to the first partition field.");
+
+ public static final ConfigOption<MemorySize>
MANIFEST_SORT_MAX_REWRITE_SIZE =
+ key("manifest-sort.max-rewrite-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(256))
+ .withDescription(
+ "Maximum total size of manifest files to rewrite
in a single"
+ + " sort rewrite pass. Sections exceeding
this limit are"
+ + " skipped. Set to a larger value to
allow more aggressive"
+ + " sort rewriting. The cap only limits
the sorted rewrite portion and full/minor cleanup may still happen beyond it.");
public static final ConfigOption<String> UPSERT_KEY =
key("upsert-key")
@@ -2614,6 +2667,19 @@ public class CoreOptions implements Serializable {
return options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE);
}
+ public boolean manifestSortEnabled() {
+ return options.get(MANIFEST_SORT_ENABLED);
+ }
+
+ @Nullable
+ public String manifestSortPartitionField() {
+ return options.get(MANIFEST_SORT_PARTITION_FIELD);
+ }
+
+ public long manifestSortMaxRewriteSize() {
+ return options.get(MANIFEST_SORT_MAX_REWRITE_SIZE).getBytes();
+ }
+
public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3f9fdb9f1c..10c9b20a04 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -55,6 +55,8 @@ import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.operation.commit.SuccessCommitResult;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.predicate.Predicate;
@@ -964,13 +966,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// try to merge old manifest files to create base manifest list
mergeAfterManifests =
ManifestFileMerger.merge(
- mergeBeforeManifests,
- manifestFile,
- options.manifestTargetSize().getBytes(),
- options.manifestMergeMinCount(),
-
options.manifestFullCompactionThresholdSize().getBytes(),
- partitionType,
- options.scanManifestParallelism());
+ mergeBeforeManifests, manifestFile, partitionType,
options);
baseManifestList = manifestList.write(mergeAfterManifests);
if (options.rowTrackingEnabled()) {
@@ -1190,16 +1186,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;
- // the fist trial
+ // the fist trial: use a copied options with forced full compaction
settings
+ Options compactOptions = Options.fromMap(options.toMap());
+ compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1);
+ compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE,
MemorySize.ofBytes(1));
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
- options.manifestTargetSize().getBytes(),
- 1,
- 1,
partitionType,
- options.scanManifestParallelism());
+ new CoreOptions(compactOptions));
if (new HashSet<>(mergeBeforeManifests).equals(new
HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were
happened
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestAdjacentSortedRun.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestAdjacentSortedRun.java
new file mode 100644
index 0000000000..ca0797c213
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestAdjacentSortedRun.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.manifest.ManifestFileMeta;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A {@code ManifestAdjacentSortedRun} is a list of {@link ManifestFileMeta}s
sorted by a single
+ * partition field (the configured manifest sort field). The intervals {@code
+ * [partitionStats.minValues[k], partitionStats.maxValues[k]]} of these
manifests do not overlap on
+ * field {@code k}, where {@code k} is the configured sort field index.
+ *
+ * <p><b>Boundary Equality:</b> Files with boundary-touching intervals (min ==
previous.max) are
+ * considered non-overlapping and can be placed in the same SortedRun. This
reduces the number of
+ * runs and improves compaction efficiency. However, such files may be
separated into different
+ * Sections during splitIntoSections to avoid merge-sort overhead.
+ */
+public class ManifestAdjacentSortedRun {
+
+ private int level;
+ private final List<ManifestFileMeta> files;
+ private final long totalSize;
+
+ private ManifestAdjacentSortedRun(List<ManifestFileMeta> files) {
+ this.level = -1;
+ this.files = Collections.unmodifiableList(files);
+ long size = 0L;
+ for (ManifestFileMeta file : files) {
+ size += file.fileSize();
+ }
+ this.totalSize = size;
+ }
+
+ /**
+ * Build a {@code ManifestAdjacentSortedRun} from an already-sorted list.
The caller MUST
+ * guarantee that {@code sortedFiles} is sorted ascending on the
configured sort field's min
+ * value, and that intervals do not overlap on that field.
+ */
+ public static ManifestAdjacentSortedRun fromSorted(List<ManifestFileMeta>
sortedFiles) {
+ return new ManifestAdjacentSortedRun(sortedFiles);
+ }
+
+ public List<ManifestFileMeta> files() {
+ return files;
+ }
+
+ public long totalSize() {
+ return totalSize;
+ }
+
+ public int level() {
+ return level;
+ }
+
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ManifestAdjacentSortedRun)) {
+ return false;
+ }
+ ManifestAdjacentSortedRun that = (ManifestAdjacentSortedRun) o;
+ return level == that.level && files.equals(that.files);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(level, files);
+ }
+
+ @Override
+ public String toString() {
+ return "ManifestAdjacentSortedRun{level="
+ + level
+ + ", files=["
+ +
files.stream().map(ManifestFileMeta::fileName).collect(Collectors.joining(", "))
+ + "]}";
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index cdcad1ed3e..f899aa7178 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry;
@@ -48,7 +49,7 @@ import static java.util.Collections.singletonList;
import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Util for merging manifest files. */
+/** Manifest file merger with standard merge logic and optional sort rewrite.
*/
public class ManifestFileMerger {
private static final Logger LOG =
LoggerFactory.getLogger(ManifestFileMerger.class);
@@ -62,33 +63,44 @@ public class ManifestFileMerger {
public static List<ManifestFileMeta> merge(
List<ManifestFileMeta> input,
ManifestFile manifestFile,
- long suggestedMetaSize,
- int suggestedMinMetaCount,
- long manifestFullCompactionSize,
RowType partitionType,
- @Nullable Integer manifestReadParallelism) {
+ CoreOptions options) {
+ // Extract configuration from options
+ long suggestedMetaSize = options.manifestTargetSize().getBytes();
+ int suggestedMinMetaCount = options.manifestMergeMinCount();
+ long manifestFullCompactionSize =
options.manifestFullCompactionThresholdSize().getBytes();
+ Integer manifestReadParallelism = options.scanManifestParallelism();
+
// these are the newly created manifest files, clean them up if
exception occurs
List<ManifestFileMeta> newFilesForAbort = new ArrayList<>();
try {
- Optional<List<ManifestFileMeta>> fullCompacted =
- tryFullCompaction(
- input,
- newFilesForAbort,
- manifestFile,
- suggestedMetaSize,
- manifestFullCompactionSize,
- partitionType,
- manifestReadParallelism);
- return fullCompacted.orElseGet(
- () ->
- tryMinorCompaction(
- input,
- newFilesForAbort,
- manifestFile,
- suggestedMetaSize,
- suggestedMinMetaCount,
- manifestReadParallelism));
+ // If manifest-sort.enabled is enabled and there are partition
fields, use
+ // trySortRewrite
+ if (options.manifestSortEnabled() && partitionType.getFieldCount()
> 0) {
+ return ManifestFileSorter.trySortCompaction(
+ input, newFilesForAbort, manifestFile, partitionType,
options);
+ } else {
+ // Otherwise try full compaction first, then minor compaction
if needed
+ Optional<List<ManifestFileMeta>> fullCompacted =
+ tryFullCompaction(
+ input,
+ newFilesForAbort,
+ manifestFile,
+ suggestedMetaSize,
+ manifestFullCompactionSize,
+ partitionType,
+ manifestReadParallelism);
+ return fullCompacted.orElseGet(
+ () ->
+ tryMinorCompaction(
+ input,
+ newFilesForAbort,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ manifestReadParallelism));
+ }
} catch (Throwable e) {
// exception occurs, clean up and rethrow
for (ManifestFileMeta manifest : newFilesForAbort) {
@@ -234,7 +246,6 @@ public class ManifestFileMerger {
}
// 2.2. merge
-
if (toBeMerged.size() <= 1) {
return Optional.empty();
}
@@ -295,7 +306,7 @@ public class ManifestFileMerger {
return new FullCompactionReadResult(file, requireChange, entries);
}
- private static Set<BinaryRow>
computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
+ static Set<BinaryRow> computeDeletePartitions(Set<FileEntry.Identifier>
deleteEntries) {
Set<BinaryRow> partitions = new HashSet<>();
for (FileEntry.Identifier identifier : deleteEntries) {
partitions.add(identifier.partition);
@@ -303,13 +314,13 @@ public class ManifestFileMerger {
return partitions;
}
- private static class FullCompactionReadResult {
+ static class FullCompactionReadResult {
- private final ManifestFileMeta file;
- private final boolean requireChange;
- private final List<ManifestEntry> entries;
+ final ManifestFileMeta file;
+ final boolean requireChange;
+ final List<ManifestEntry> entries;
- private FullCompactionReadResult(
+ FullCompactionReadResult(
ManifestFileMeta file, boolean requireChange,
List<ManifestEntry> entries) {
this.file = file;
this.requireChange = requireChange;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
new file mode 100644
index 0000000000..39ef0bab52
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
@@ -0,0 +1,1149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.Function;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+
+/** Manifest file sorter that sorts and rewrites manifest files by a
configured partition field. */
+public class ManifestFileSorter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ManifestFileSorter.class);
+
+ /** Context object that carries shared state across compaction methods. */
+ static class CompactionContext {
+ final boolean fullCompaction;
+ final RecordComparator fieldComparator;
+ final Set<FileEntry.Identifier> deleteEntries;
+ final Map<ManifestFileMeta, Boolean> defaultCompactionMap;
+ final List<ManifestAdjacentSortedRun> levelRuns;
+ final List<ManifestAdjacentSortedRun> pickedRuns;
+
+ CompactionContext(
+ boolean fullCompaction,
+ RecordComparator fieldComparator,
+ Set<FileEntry.Identifier> deleteEntries,
+ Map<ManifestFileMeta, Boolean> defaultCompactionMap,
+ List<ManifestAdjacentSortedRun> levelRuns,
+ List<ManifestAdjacentSortedRun> pickedRuns) {
+ this.fullCompaction = fullCompaction;
+ this.fieldComparator = fieldComparator;
+ this.deleteEntries = deleteEntries;
+ this.defaultCompactionMap = defaultCompactionMap;
+ this.levelRuns = levelRuns;
+ this.pickedRuns = pickedRuns;
+ }
+ }
+
+ /** Result of classifying manifest files. */
+ private static class ClassifyResult {
+ final List<ManifestFileMeta> lsmFiles;
+ final Set<FileEntry.Identifier> deleteEntries;
+ final Map<ManifestFileMeta, Boolean> defaultCompactionMap;
+
+ ClassifyResult(
+ List<ManifestFileMeta> lsmFiles,
+ Set<FileEntry.Identifier> deleteEntries,
+ Map<ManifestFileMeta, Boolean> defaultCompactionMap) {
+ this.lsmFiles = lsmFiles;
+ this.deleteEntries = deleteEntries;
+ this.defaultCompactionMap = defaultCompactionMap;
+ }
+ }
+
+ /**
+ * Try to sort-rewrite the merged manifest list by a configured partition
field. If the sort
+ * field cannot be resolved, the input is returned as-is.
+ *
+ * <p>Dispatches to {@link #tryFullCompaction} when totalDeltaFileSize >=
sizeTrigger, or {@link
+ * #tryMinorCompaction} otherwise.
+ */
+ static List<ManifestFileMeta> trySortCompaction(
+ List<ManifestFileMeta> input,
+ List<ManifestFileMeta> newFilesForAbort,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ CoreOptions options)
+ throws Exception {
+ String sortPartitionField = options.manifestSortPartitionField();
+ long suggestedMetaSize = options.manifestTargetSize().getBytes();
+ int suggestedMinMetaCount = options.manifestMergeMinCount();
+ long fullCompactionThreshold =
options.manifestFullCompactionThresholdSize().getBytes();
+ long maxRewriteSize = options.manifestSortMaxRewriteSize();
+ int maxSizeAmplificationPercent =
options.maxSizeAmplificationPercent();
+ int sortedRunSizeRatio = options.sortedRunSizeRatio();
+ Integer manifestReadParallelism = options.scanManifestParallelism();
+
+ Optional<List<ManifestFileMeta>> fullCompacted =
+ tryFullCompaction(
+ input,
+ newFilesForAbort,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ fullCompactionThreshold,
+ maxRewriteSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ if (fullCompacted.isPresent()) {
+ return fullCompacted.get();
+ }
+ return tryMinorCompaction(
+ input,
+ newFilesForAbort,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ maxRewriteSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ }
+
+ /**
+ * Full compaction path: totalDeltaFileSize >= sizeTrigger.
+ *
+ * <p>Does not build index mapping. sortAndRewriteSection writes all
entries (ADD+DELETE merged)
+ * together without separating them.
+ */
+ private static Optional<List<ManifestFileMeta>> tryFullCompaction(
+ List<ManifestFileMeta> input,
+ List<ManifestFileMeta> newFilesForAbort,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ String sortPartitionField,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long fullCompactionThreshold,
+ long maxRewriteSize,
+ int maxSizeAmplificationPercent,
+ int sortedRunSizeRatio,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Step 1: Check if full compaction threshold is met
+ long totalDeltaFileSize = 0;
+ for (ManifestFileMeta file : input) {
+ if (file.numDeletedFiles() > 0 || file.fileSize() <
suggestedMetaSize) {
+ totalDeltaFileSize += file.fileSize();
+ }
+ }
+ if (totalDeltaFileSize < fullCompactionThreshold) {
+ return Optional.empty();
+ }
+ // Step 2: Prepare compaction context
+ CompactionContext ctx =
+ prepareCompaction(
+ input,
+ true,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ List<ManifestAdjacentSortedRun> levelRuns = ctx.levelRuns;
+ List<ManifestAdjacentSortedRun> pickedRuns = ctx.pickedRuns;
+
+ if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) {
+ LOG.debug(
+ "Manifest sort full compact skipped: no runs picked and no
defaultCompaction files.");
+ return Optional.empty();
+ }
+
+ LOG.info(
+ "Manifest sort full compact: input={} files, lsm={} runs,
picked={} runs, "
+ + "defaultCompaction={} files.",
+ input.size(),
+ levelRuns.size(),
+ pickedRuns.size(),
+ ctx.defaultCompactionMap.size());
+
+ // Step 3: Collect reused files (not picked) and picked files
+ Set<ManifestAdjacentSortedRun> pickedSet = new HashSet<>(pickedRuns);
+ List<ManifestFileMeta> result = new ArrayList<>();
+ for (ManifestAdjacentSortedRun run : levelRuns) {
+ if (!pickedSet.contains(run)) {
+ result.addAll(run.files());
+ }
+ }
+ List<ManifestFileMeta> pickedFiles = new ArrayList<>();
+ for (ManifestAdjacentSortedRun run : pickedRuns) {
+ pickedFiles.addAll(run.files());
+ }
+ pickedFiles.addAll(ctx.defaultCompactionMap.keySet());
+
+ // Step 4: Split into sections and merge small adjacent sections
+ List<Section> sections =
+ splitIntoSections(pickedFiles, ctx.fieldComparator,
ctx.defaultCompactionMap);
+ sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
+
+ LOG.info(
+ "Manifest sort full compact: pickedFiles={}, sections={}.",
+ pickedFiles.size(),
+ sections.size());
+
+ // Step 5: Rewrite sections
+ FullCompactOutput output = new FullCompactOutput(result);
+ rewriteSections(
+ sections,
+ output,
+ newFilesForAbort,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ maxRewriteSize,
+ manifestReadParallelism);
+
+ LOG.info(
+ "Manifest sort full compact completed: input={},
resultFiles={}.",
+ input.size(),
+ result.size());
+ return Optional.of(result);
+ }
+
+ /**
+ * Minor compaction path: totalDeltaFileSize < sizeTrigger.
+ *
+ * <p>Builds index mapping to preserve original positions.
sortAndRewriteSection separates ADD
+ * and DELETE entries, placing ADD at result[minIdx] and DELETE at
result[maxIdx].
+ */
+ private static List<ManifestFileMeta> tryMinorCompaction(
+ List<ManifestFileMeta> input,
+ List<ManifestFileMeta> newFilesForAbort,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ String sortPartitionField,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long maxRewriteSize,
+ int maxSizeAmplificationPercent,
+ int sortedRunSizeRatio,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Step 1: Prepare compaction context (early-return if nothing to
compact)
+ CompactionContext ctx =
+ prepareCompaction(
+ input,
+ false,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ List<ManifestAdjacentSortedRun> levelRuns = ctx.levelRuns;
+ List<ManifestAdjacentSortedRun> pickedRuns = ctx.pickedRuns;
+
+ if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) {
+ LOG.debug(
+ "Manifest sort minor compact skipped: no runs picked and
no defaultCompaction files.");
+ return input;
+ }
+
+ LOG.info(
+ "Manifest sort minor compact: input={} files, lsm={} runs,
picked={} runs, "
+ + "defaultCompaction={} files.",
+ input.size(),
+ levelRuns.size(),
+ pickedRuns.size(),
+ ctx.defaultCompactionMap.size());
+
+ // Step 2: Build fileName -> index mapping and initialize 2D result
+ Map<String, Integer> fileNameToIndex = new HashMap<>();
+ List<List<ManifestFileMeta>> result = new ArrayList<>(input.size());
+ for (int i = 0; i < input.size(); i++) {
+ fileNameToIndex.put(input.get(i).fileName(), i);
+ result.add(new ArrayList<>());
+ }
+
+ // Step 3: Collect reused files and picked files
+ Set<ManifestAdjacentSortedRun> pickedSet = new HashSet<>(pickedRuns);
+ for (ManifestAdjacentSortedRun run : levelRuns) {
+ if (!pickedSet.contains(run)) {
+ for (ManifestFileMeta file : run.files()) {
+ Integer idx = fileNameToIndex.get(file.fileName());
+ if (idx != null) {
+ result.get(idx).add(file);
+ }
+ }
+ }
+ }
+
+ List<ManifestFileMeta> pickedFiles = new ArrayList<>();
+ for (ManifestAdjacentSortedRun run : pickedRuns) {
+ pickedFiles.addAll(run.files());
+ }
+ pickedFiles.addAll(ctx.defaultCompactionMap.keySet());
+
+ // Step 4: Compute index range
+ int minIdx = Integer.MAX_VALUE;
+ int maxIdx = Integer.MIN_VALUE;
+ for (ManifestFileMeta meta : pickedFiles) {
+ Integer idx = fileNameToIndex.get(meta.fileName());
+ if (idx != null) {
+ minIdx = Math.min(minIdx, idx);
+ maxIdx = Math.max(maxIdx, idx);
+ }
+ }
+ Pair<Integer, Integer> indexRange = Pair.of(minIdx, maxIdx);
+
+ // Step 5: Split into sections and merge small adjacent sections
+ List<Section> sections =
+ splitIntoSections(pickedFiles, ctx.fieldComparator,
ctx.defaultCompactionMap);
+ sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
+
+ LOG.info(
+ "Manifest sort minor compact: pickedFiles={}, sections={}.",
+ pickedFiles.size(),
+ sections.size());
+
+ // Step 6: Rewrite sections
+ MinorCompactOutput output = new MinorCompactOutput(result, indexRange,
fileNameToIndex);
+ rewriteSections(
+ sections,
+ output,
+ newFilesForAbort,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ maxRewriteSize,
+ manifestReadParallelism);
+
+ // Step 7: Flatten 2D result into a single list
+ List<ManifestFileMeta> flatResult = new ArrayList<>();
+ for (List<ManifestFileMeta> subList : result) {
+ flatResult.addAll(subList);
+ }
+
+ LOG.info(
+ "Manifest sort minor compact completed: input={},
resultFiles={}.",
+ input.size(),
+ flatResult.size());
+ return flatResult;
+ }
+
+ /**
+ * Prepare compaction context: resolve sort field, classify manifests,
build level runs, and
+ * pick runs for compaction.
+ *
+ * @return CompactionContext containing all shared state
+ */
+ private static CompactionContext prepareCompaction(
+ List<ManifestFileMeta> input,
+ boolean fullCompaction,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ String sortPartitionField,
+ long suggestedMetaSize,
+ int maxSizeAmplificationPercent,
+ int sortedRunSizeRatio,
+ @Nullable Integer manifestReadParallelism) {
+
+ // Step 1: Resolve sort field and build comparator for partition
ordering.
+ String sortField = resolveSortField(sortPartitionField, partitionType);
+ if (sortField == null) {
+ throw new IllegalArgumentException(
+ "Cannot resolve sort field for manifest sort rewrite.");
+ }
+ int sortFieldIndex = partitionType.getFieldNames().indexOf(sortField);
+ RecordComparator fieldComparator =
+ CodeGenUtils.newRecordComparator(
+ partitionType.getFieldTypes(), new int[]
{sortFieldIndex});
+
+ // Step 2: Classify manifests into LSM files and collect delete
entries.
+ ClassifyResult classifyResult =
+ classifyManifests(
+ input,
+ fullCompaction,
+ manifestFile,
+ partitionType,
+ suggestedMetaSize,
+ manifestReadParallelism);
+ List<ManifestFileMeta> lsmFiles = classifyResult.lsmFiles;
+
+ // Step 3: Build level-sorted runs from LSM files based on partition
order.
+ List<ManifestAdjacentSortedRun> levelRuns =
+ lsmFiles.isEmpty()
+ ? new ArrayList<>()
+ : buildLevelSortedRuns(lsmFiles, fieldComparator);
+
+ // Step 4: Pick runs for compaction using size amplification and ratio
strategy.
+ ManifestPickStrategy pickStrategy =
+ new ManifestPickStrategy(maxSizeAmplificationPercent,
sortedRunSizeRatio);
+ List<ManifestAdjacentSortedRun> pickedRuns =
pickStrategy.pick(levelRuns);
+
+ return new CompactionContext(
+ fullCompaction,
+ fieldComparator,
+ classifyResult.deleteEntries,
+ classifyResult.defaultCompactionMap,
+ levelRuns,
+ pickedRuns);
+ }
+
+ /**
+ * Classify manifest files into default-compaction group and LSM group.
+ *
+ * <p>Full compaction: small files and files overlapping delete partitions
go into
+ * defaultCompactionMap; the rest are returned as lsmFiles.
+ *
+ * <p>Non-full compaction: small files go to defaultCompactionMap for
minor-style merge; the
+ * rest are returned as lsmFiles.
+ *
+ * @return ClassifyResult containing lsmFiles, deleteEntries, and
defaultCompactionMap
+ */
+ private static ClassifyResult classifyManifests(
+ List<ManifestFileMeta> input,
+ boolean fullCompaction,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ long suggestedMetaSize,
+ @Nullable Integer manifestReadParallelism) {
+ // Initialize classification containers and read delete entries
+ Map<ManifestFileMeta, Boolean> classifiedDefaultMap = new
LinkedHashMap<>();
+ List<ManifestFileMeta> lsmFiles = new LinkedList<>(input);
+ Set<FileEntry.Identifier> classifiedDeleteEntries =
Collections.emptySet();
+ PartitionPredicate predicate = null;
+ if (fullCompaction) {
+ classifiedDeleteEntries =
+ FileEntry.readDeletedEntries(manifestFile, input,
manifestReadParallelism);
+
+ // Build partition predicate from delete entries for overlap
detection
+ if (classifiedDeleteEntries.isEmpty()) {
+ predicate = PartitionPredicate.ALWAYS_FALSE;
+ } else {
+ if (partitionType.getFieldCount() > 0) {
+ Set<BinaryRow> deletePartitions =
+
ManifestFileMerger.computeDeletePartitions(classifiedDeleteEntries);
+ predicate = PartitionPredicate.fromMultiple(partitionType,
deletePartitions);
+ } else {
+ predicate = PartitionPredicate.ALWAYS_TRUE;
+ }
+ }
+ }
+
+ // Classify each file based on size and delete-partition overlap
+ Iterator<ManifestFileMeta> iterator = lsmFiles.iterator();
+ while (iterator.hasNext()) {
+ ManifestFileMeta file = iterator.next();
+ boolean small = file.fileSize() < suggestedMetaSize;
+ boolean inDeleteRange =
+ predicate != null
+ && predicate.test(
+ file.numAddedFiles() +
file.numDeletedFiles(),
+ file.partitionStats().minValues(),
+ file.partitionStats().maxValues(),
+ file.partitionStats().nullCounts());
+ if (small || inDeleteRange) {
+ iterator.remove();
+ classifiedDefaultMap.put(file, inDeleteRange);
+ }
+ }
+
+ return new ClassifyResult(lsmFiles, classifiedDeleteEntries,
classifiedDefaultMap);
+ }
+
+ /**
+ * Build level-sorted runs from a list of manifest files. Sorts files by
min partition value,
+ * greedy-scans to build non-overlapping SortedRuns, then assigns levels
by totalSize (Top-4
+ * largest to level 1~4, rest to level 0).
+ */
+ static List<ManifestAdjacentSortedRun> buildLevelSortedRuns(
+ List<ManifestFileMeta> input, RecordComparator fieldComparator) {
+ // Step 1: Sort by min value (if equal, then by max value)
+ input.sort(
+ (a, b) -> {
+ int cmp =
+ fieldComparator.compare(
+ a.partitionStats().minValues(),
b.partitionStats().minValues());
+ if (cmp != 0) {
+ return cmp;
+ }
+ return fieldComparator.compare(
+ a.partitionStats().maxValues(),
b.partitionStats().maxValues());
+ });
+
+ // Step 2: Interval graph coloring algorithm - assign files to runs
+ // Use priority queue to track runs by their max values
+ PriorityQueue<List<ManifestFileMeta>> runs =
+ new PriorityQueue<>(
+ (r1, r2) -> {
+ ManifestFileMeta last1 = r1.get(r1.size() - 1);
+ ManifestFileMeta last2 = r2.get(r2.size() - 1);
+ return fieldComparator.compare(
+ last1.partitionStats().maxValues(),
+ last2.partitionStats().maxValues());
+ });
+
+ for (ManifestFileMeta file : input) {
+ List<ManifestFileMeta> earliestRun = runs.poll();
+ if (earliestRun == null) {
+ // No existing runs, create a new one
+ List<ManifestFileMeta> newRun = new ArrayList<>();
+ newRun.add(file);
+ runs.offer(newRun);
+ } else if (fieldComparator.compare(
+ file.partitionStats().minValues(),
+ earliestRun.get(earliestRun.size() -
1).partitionStats().maxValues())
+ >= 0) {
+ // Current file's min >= run's max, append to this run
+ // Note: When min == max (boundary equality), files are
considered
+ // non-overlapping and can be placed in the same SortedRun.
This allows
+ // building fewer SortedRuns, improving compaction efficiency
while
+ // maintaining correct sort order. However, these files may
later be separated
+ // into different Sections during splitIntoSections to avoid
merge-sort overhead.
+ //
+ // See ManifestAdjacentSortedRun class comment for the full
boundary equality
+ // semantics.
+ earliestRun.add(file);
+ runs.offer(earliestRun);
+ } else {
+ // Overlap detected, put the run back and create a new one
+ runs.offer(earliestRun);
+ List<ManifestFileMeta> newRun = new ArrayList<>();
+ newRun.add(file);
+ runs.offer(newRun);
+ }
+ }
+
+ // Step 3: Convert to ManifestAdjacentSortedRun list
+ List<ManifestAdjacentSortedRun> result = new ArrayList<>();
+ while (!runs.isEmpty()) {
+ result.add(ManifestAdjacentSortedRun.fromSorted(runs.poll()));
+ }
+
+ // Step 4: Sort by totalSize and assign levels
+
result.sort(Comparator.comparingLong(ManifestAdjacentSortedRun::totalSize));
+ int n = result.size();
+ int maxLevel = ManifestPickStrategy.MAX_LEVEL;
+ for (int i = 0; i < n; i++) {
+ if (i >= n - maxLevel) {
+ result.get(i).setLevel(i - (n - maxLevel) + 1);
+ } else {
+ result.get(i).setLevel(0);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Split picked files into sections. Files with overlapping sort-key
intervals go into the same
+ * section. Each section is built with pre-computed totalSize and
hasDefaultCompactMeta.
+ */
+ static List<Section> splitIntoSections(
+ List<ManifestFileMeta> pickedFiles,
+ RecordComparator fieldComparator,
+ Map<ManifestFileMeta, Boolean> defaultCompactionMap) {
+ pickedFiles.sort(
+ (a, b) -> {
+ int cmp =
+ fieldComparator.compare(
+ a.partitionStats().minValues(),
b.partitionStats().minValues());
+ if (cmp != 0) {
+ return cmp;
+ }
+ return fieldComparator.compare(
+ a.partitionStats().maxValues(),
b.partitionStats().maxValues());
+ });
+
+ List<Section> sections = new ArrayList<>();
+ List<ManifestFileMeta> currentFiles = new ArrayList<>();
+ long currentTotalSize = 0;
+ boolean currentHasDefault = false;
+ ManifestFileMeta first = pickedFiles.get(0);
+ currentFiles.add(first);
+ currentTotalSize += first.fileSize();
+ currentHasDefault = defaultCompactionMap.containsKey(first);
+ BinaryRow sectionMaxBound = first.partitionStats().maxValues();
+
+ for (int i = 1; i < pickedFiles.size(); i++) {
+ ManifestFileMeta file = pickedFiles.get(i);
+ // Note: Boundary equality (file.min == sectionMaxBound) results
in separate
+ // sections. This design choice balances three factors:
+ // 1. Avoid merge-sort overhead: Files with non-overlapping
boundaries can be processed
+ // independently without merge-sort, improving performance.
+ // 2. Maintain partition filtering capability: Each section has a
distinct key range,
+ // enabling efficient partition pruning during queries.
+ // 3. Preserve ordering invariant: Separating boundary-touching
files into different
+ // sections
+ // does not break the global sort order, as they are still
processed in ascending
+ // order.
+ //
+ // IMPORTANT: While boundary-touching files are separated into
different Sections here,
+ // they may be placed in the same SortedRun during
buildLevelSortedRuns (which uses >= 0
+ // comparison). This dual behavior is intentional and documented
in class comments.
+ if (fieldComparator.compare(file.partitionStats().minValues(),
sectionMaxBound) >= 0) {
+ sections.add(new Section(currentFiles, currentTotalSize,
currentHasDefault));
+ currentFiles = new ArrayList<>();
+ currentTotalSize = 0;
+ currentFiles.add(file);
+ currentTotalSize += file.fileSize();
+ currentHasDefault = defaultCompactionMap.containsKey(file);
+ sectionMaxBound = file.partitionStats().maxValues();
+ } else {
+ currentFiles.add(file);
+ currentTotalSize += file.fileSize();
+ if (!currentHasDefault &&
defaultCompactionMap.containsKey(file)) {
+ currentHasDefault = true;
+ }
+ if (fieldComparator.compare(file.partitionStats().maxValues(),
sectionMaxBound)
+ > 0) {
+ sectionMaxBound = file.partitionStats().maxValues();
+ }
+ }
+ }
+ sections.add(new Section(currentFiles, currentTotalSize,
currentHasDefault));
+ return sections;
+ }
+
+ /**
+ * Merge small adjacent sections to avoid producing too many small rewrite
batches. If either
+ * the pending section or the current section total size is smaller than
{@code
+ * suggestedMetaSize}, they are combined into a single section.
+ */
+ private static List<Section> mergeSmallAdjacentSections(
+ List<Section> sections, long suggestedMetaSize) {
+ List<Section> merged = new ArrayList<>();
+ Section pending = null;
+
+ for (Section section : sections) {
+ if (pending == null) {
+ pending = section;
+ } else {
+ if (pending.totalSize < suggestedMetaSize
+ || section.totalSize < suggestedMetaSize) {
+ pending = Section.merge(pending, section);
+ } else {
+ merged.add(pending);
+ pending = section;
+ }
+ }
+ }
+ if (pending != null) {
+ merged.add(pending);
+ }
+ return merged;
+ }
+
+ /**
+ * Rewrite sections with budget control.
+ *
+ * <p><b>Semantics of manifest-sort.max-rewrite-size:</b> This budget
applies only to the sorted
+ * rewrite portion. When the cumulative size reaches the limit:
+ *
+ * <ul>
+ * <li>First overflow: The current section is split. The rewritable part
is sorted and
+ * rewritten. The remaining part is appended back to the sections
queue for later
+ * processing.
+ * <li>Subsequent overflows: If the section has files in
defaultCompactionMap (needs default
+ * compaction), rewriteSubSegments is called to process it in
smaller chunks. Otherwise,
+ * the section is skipped.
+ * </ul>
+ *
+ * <p>This design ensures that the budget only limits the aggressive sort
rewrite, while still
+ * allowing necessary cleanup operations (delete entry elimination, small
file merge) through
+ * the rewriteSubSegments fallback path.
+ */
+ private static void rewriteSections(
+ List<Section> sections,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long maxRewriteSize,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ long processedSize = 0;
+ boolean reachedLimit = false;
+
+ for (int i = 0; i < sections.size(); i++) {
+ Section section = sections.get(i);
+ if (section.files.size() == 1) {
+ sortAndRewriteSection(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ continue;
+ }
+
+ if (processedSize + section.totalSize <= maxRewriteSize) {
+ processedSize += section.totalSize;
+ sortAndRewriteSection(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ } else if (!reachedLimit) {
+ long rewriteTotalSize = maxRewriteSize - processedSize;
+ processedSize += section.totalSize;
+ List<ManifestFileMeta> rewriteFiles = new ArrayList<>();
+ List<ManifestFileMeta> remainingFiles = new ArrayList<>();
+ long rewriteSize = 0;
+ long remainingSize = 0;
+ boolean remainingHasDefault = false;
+
+ for (ManifestFileMeta file : section.files) {
+ if (rewriteSize + file.fileSize() <= rewriteTotalSize) {
+ rewriteFiles.add(file);
+ rewriteSize += file.fileSize();
+ } else {
+ remainingFiles.add(file);
+ remainingSize += file.fileSize();
+ if (ctx.defaultCompactionMap.containsKey(file)) {
+ remainingHasDefault = true;
+ }
+ }
+ }
+
+ sortAndRewriteSection(
+ rewriteFiles,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+
+ if (!remainingFiles.isEmpty()) {
+ Section remainingSection =
+ new Section(remainingFiles, remainingSize,
remainingHasDefault);
+ // global manifest file metas order by sort key is not a
required invariant
+ sections.add(remainingSection);
+ }
+ reachedLimit = true;
+ } else if (section.hasDefaultCompactMeta) {
+ rewriteSubSegments(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ manifestReadParallelism);
+ } else {
+ output.addAllUnchanged(section.files);
+ }
+ }
+ }
+
+ /**
+ * Rewrite a section in smaller sub-segments when it exceeds the sort
rewrite budget.
+ *
+ * <p><b>Semantics difference from old minor merge:</b> In the old
ManifestFileMerger path, the
+ * trailing candidates are kept unchanged when their count is below
manifest.merge-min-count. In
+ * this sort path, rewriteSubSegments is triggered when
defaultCompactionMap is non-empty,
+ * regardless of the manifest count. This is because files in
defaultCompactionMap either:
+ *
+ * <ul>
+ * <li>Are small files needing consolidation
+ * <li>Contain delete entries that must be eliminated
+ * </ul>
+ *
+ * <p>The manifest.merge-min-count threshold is still applied to the final
sub-segment's tail,
+ * acting as a conservative gate to avoid unnecessary rewrite when there
are no delete entries
+ * and the tail is too small.
+ */
+ private static void rewriteSubSegments(
+ List<ManifestFileMeta> section,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ List<ManifestFileMeta> subSegment = new ArrayList<>();
+ long subSegmentSize = 0;
+ for (ManifestFileMeta m : section) {
+ subSegmentSize += m.fileSize();
+ subSegment.add(m);
+
+ if (subSegmentSize >= suggestedMetaSize) {
+ sortAndRewriteSection(
+ subSegment,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ subSegment.clear();
+ subSegmentSize = 0;
+ }
+ }
+ // Flush tail only if delete entries exist or file count >= minCount.
+ if (!subSegment.isEmpty()) {
+ if (!ctx.deleteEntries.isEmpty() || subSegment.size() >=
suggestedMinMetaCount) {
+ sortAndRewriteSection(
+ subSegment,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ } else {
+ output.addAllUnchanged(subSegment);
+ }
+ }
+ }
+
+ /**
+ * Sort and rewrite a section. Dispatches to full or minor compact path.
+ *
+ * <p>sortNewFiles is the same reference as newFilesForAbort, ensuring
newly written files are
+ * cleaned up on exception by the caller's catch block.
+ */
+ private static void sortAndRewriteSection(
+ List<ManifestFileMeta> section,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Skip rewrite for single file not in delete-range.
+ if (section.size() == 1 &&
!ctx.defaultCompactionMap.getOrDefault(section.get(0), false)) {
+ output.addUnchanged(section.get(0));
+ return;
+ }
+
+ if (ctx.fullCompaction) {
+ sortAndRewriteFull(
+ section, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
+ } else {
+ sortAndRewriteMinor(
+ section, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
+ }
+ }
+
+ /**
+ * Full compaction path: read all surviving entries (ADD merged with
DELETE), sort them
+ * together, and write to output as a single sorted stream.
+ */
+ private static void sortAndRewriteFull(
+ List<ManifestFileMeta> section,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Read surviving ADD entries: filter out entries cancelled by
deleteEntries.
+ Function<ManifestFileMeta, List<ManifestEntry>> reader =
+ meta -> {
+ List<ManifestEntry> batch = new ArrayList<>();
+ for (ManifestEntry entry :
+ manifestFile.read(
+ meta.fileName(),
+ meta.fileSize(),
+ FileEntry.addFilter(),
+ Filter.alwaysTrue())) {
+ if (!ctx.deleteEntries.contains(entry.identifier())) {
+ batch.add(entry);
+ }
+ }
+ return batch;
+ };
+
+ List<ManifestEntry> entries = new ArrayList<>();
+ for (ManifestEntry entry :
+ sequentialBatchedExecute(reader, section,
manifestReadParallelism)) {
+ entries.add(entry);
+ }
+
+ if (!entries.isEmpty()) {
+ List<ManifestFileMeta> sorted =
+ sortAndWriteEntries(entries, ctx.fieldComparator,
manifestFile);
+ output.addSortedFiles(sorted);
+ sortNewFiles.addAll(sorted);
+ }
+ }
+
+ /**
+ * Minor compaction path: read entries with ADD/DELETE classified in a
single pass per file,
+ * then sort each group independently and write them to output.
+ *
+ * <p>Each file is read in parallel (via sequentialBatchedExecute). The
reader classifies
+ * entries into ADD and DELETE within each file, returning a Pair. Results
are merged in the
+ * main thread.
+ */
+ private static void sortAndRewriteMinor(
+ List<ManifestFileMeta> section,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Read and classify ADD/DELETE in one pass per file.
+ Function<ManifestFileMeta, List<Pair<List<ManifestEntry>,
List<ManifestEntry>>>> reader =
+ meta -> {
+ List<ManifestEntry> addBatch = new ArrayList<>();
+ List<ManifestEntry> deleteBatch = new ArrayList<>();
+ for (ManifestEntry entry :
+ manifestFile.read(meta.fileName(),
meta.fileSize())) {
+ if (entry.kind() == FileKind.ADD) {
+ addBatch.add(entry);
+ } else {
+ deleteBatch.add(entry);
+ }
+ }
+ return singletonList(Pair.of(addBatch, deleteBatch));
+ };
+
+ Map<FileEntry.Identifier, ManifestEntry> addMap = new HashMap<>();
+ List<ManifestEntry> minorDeleteEntries = new ArrayList<>();
+ for (Pair<List<ManifestEntry>, List<ManifestEntry>> pair :
+ sequentialBatchedExecute(reader, section,
manifestReadParallelism)) {
+ for (ManifestEntry entry : pair.getLeft()) {
+ addMap.put(entry.identifier(), entry);
+ }
+ minorDeleteEntries.addAll(pair.getRight());
+ }
+
+ // Cancel out ADD+DELETE pairs with the same identifier within the
section.
+ minorDeleteEntries.removeIf(
+ manifestEntry -> addMap.remove(manifestEntry.identifier()) !=
null);
+ List<ManifestEntry> addEntries = new ArrayList<>(addMap.values());
+
+ if (!addEntries.isEmpty()) {
+ List<ManifestFileMeta> sorted =
+ sortAndWriteEntries(addEntries, ctx.fieldComparator,
manifestFile);
+ output.addSortedFiles(sorted);
+ sortNewFiles.addAll(sorted);
+ }
+
+ if (!minorDeleteEntries.isEmpty()) {
+ List<ManifestFileMeta> sorted =
+ sortAndWriteEntries(minorDeleteEntries,
ctx.fieldComparator, manifestFile);
+ output.addDeleteFiles(sorted);
+ sortNewFiles.addAll(sorted);
+ }
+ }
+
+ /** Sort entries and write them to a new manifest file with proper error
handling. */
+ private static List<ManifestFileMeta> sortAndWriteEntries(
+ List<ManifestEntry> entries,
+ RecordComparator fieldComparator,
+ ManifestFile manifestFile)
+ throws Exception {
+ entries.sort((a, b) -> compareSortKey(a, b, fieldComparator));
+ RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
+ manifestFile.createRollingWriter();
+ Exception exception = null;
+ try {
+ writer.write(entries);
+ } catch (Exception e) {
+ exception = e;
+ } finally {
+ if (exception != null) {
+ writer.abort();
+ throw exception;
+ }
+ writer.close();
+ }
+ return writer.result();
+ }
+
+ /**
+ * Compare two {@link ManifestEntry}s by the composite key {@code
(sort-field, kind, fileName)}.
+ * {@code fileName} is used as the tie-breaker so that all entries sharing
the same sort-field
+ * value AND the same data file are emitted contiguously.
+ */
+ static int compareSortKey(ManifestEntry a, ManifestEntry b,
RecordComparator fieldComparator) {
+ int c = fieldComparator.compare(a.partition(), b.partition());
+ if (c != 0) {
+ return c;
+ }
+ // ADD before DELETE
+ int kindCmp = a.kind().compareTo(b.kind());
+ if (kindCmp != 0) {
+ return kindCmp;
+ }
+ return a.file().fileName().compareTo(b.file().fileName());
+ }
+
+ /**
+ * Resolve the partition field to sort manifests by.
+ *
+ * <p>Resolution rules:
+ *
+ * <ol>
+ * <li>If {@code manifest-sort.partition-field} is configured, return
that value.
+ * <li>Otherwise, default to the first partition field.
+ * </ol>
+ */
+ static String resolveSortField(String sortPartitionField, RowType
partitionType) {
+ if (sortPartitionField != null && !sortPartitionField.isEmpty()) {
+ return sortPartitionField;
+ }
+ return partitionType.getFieldNames().get(0);
+ }
+
+ /** Strategy interface for writing compaction results. */
+ interface RewriteOutput {
+ void addUnchanged(ManifestFileMeta file);
+
+ void addAllUnchanged(List<ManifestFileMeta> files);
+
+ void addSortedFiles(List<ManifestFileMeta> files);
+
+ void addDeleteFiles(List<ManifestFileMeta> files);
+ }
+
+ private static class FullCompactOutput implements RewriteOutput {
+ private final List<ManifestFileMeta> result;
+
+ FullCompactOutput(List<ManifestFileMeta> result) {
+ this.result = result;
+ }
+
+ @Override
+ public void addUnchanged(ManifestFileMeta file) {
+ result.add(file);
+ }
+
+ @Override
+ public void addAllUnchanged(List<ManifestFileMeta> files) {
+ result.addAll(files);
+ }
+
+ @Override
+ public void addSortedFiles(List<ManifestFileMeta> files) {
+ result.addAll(files);
+ }
+
+ @Override
+ public void addDeleteFiles(List<ManifestFileMeta> files) {
+ result.addAll(files);
+ }
+ }
+
+ private static class MinorCompactOutput implements RewriteOutput {
+ private final List<List<ManifestFileMeta>> result;
+ private final Pair<Integer, Integer> indexRange;
+ private final Map<String, Integer> fileNameToIndex;
+
+ MinorCompactOutput(
+ List<List<ManifestFileMeta>> result,
+ Pair<Integer, Integer> indexRange,
+ Map<String, Integer> fileNameToIndex) {
+ this.result = result;
+ this.indexRange = indexRange;
+ this.fileNameToIndex = fileNameToIndex;
+ }
+
+ @Override
+ public void addUnchanged(ManifestFileMeta file) {
+ Integer idx = fileNameToIndex.get(file.fileName());
+ result.get(idx).add(file);
+ }
+
+ @Override
+ public void addAllUnchanged(List<ManifestFileMeta> files) {
+ for (ManifestFileMeta file : files) {
+ addUnchanged(file);
+ }
+ }
+
+ @Override
+ public void addSortedFiles(List<ManifestFileMeta> files) {
+ result.get(indexRange.getLeft()).addAll(files);
+ }
+
+ @Override
+ public void addDeleteFiles(List<ManifestFileMeta> files) {
+ result.get(indexRange.getRight()).addAll(files);
+ }
+ }
+
+ /** A section of manifest files with pre-computed metadata. */
+ static class Section {
+ final List<ManifestFileMeta> files;
+ final long totalSize;
+ final boolean hasDefaultCompactMeta;
+
+ Section(List<ManifestFileMeta> files, long totalSize, boolean
hasDefaultCompactMeta) {
+ this.files = files;
+ this.totalSize = totalSize;
+ this.hasDefaultCompactMeta = hasDefaultCompactMeta;
+ }
+
+ /** Create a merged section from two sections. */
+ static Section merge(Section a, Section b) {
+ List<ManifestFileMeta> merged = new ArrayList<>(a.files);
+ merged.addAll(b.files);
+ return new Section(
+ merged,
+ a.totalSize + b.totalSize,
+ a.hasDefaultCompactMeta || b.hasDefaultCompactMeta);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java
new file mode 100644
index 0000000000..519c49676c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Pick strategy for manifest LSM Tree compaction.
+ *
+ * <p>Strategy priority:
+ *
+ * <ol>
+ * <li><b>SizeAmp</b>: if all lower-level runs' total size exceeds the
highest-level run's size
+ * times {@code sizeAmpThreshold}, trigger full compaction (pick all
runs).
+ * <li><b>SizeRatio</b>: from low to high, pick adjacent runs whose
amplification factor is less
+ * than {@code sizeRatioThreshold}.
+ * <li><b>Forced pick</b>: level0 and level1 runs are always picked.
+ * </ol>
+ */
+public class ManifestPickStrategy {
+
+ public static final int MAX_LEVEL = 4;
+
+ private final int sizeAmpThreshold;
+ private final int sizeRatioThreshold;
+
+ public ManifestPickStrategy(int sizeAmpThreshold, int sizeRatioThreshold) {
+ Preconditions.checkArgument(sizeAmpThreshold > 0, "sizeAmpThreshold
must be positive");
+ Preconditions.checkArgument(sizeRatioThreshold > 0,
"sizeRatioThreshold must be positive");
+ this.sizeAmpThreshold = sizeAmpThreshold;
+ this.sizeRatioThreshold = sizeRatioThreshold;
+ }
+
+ /**
+ * Pick runs that need compaction from the given level runs.
+ *
+ * @param levelRuns runs with assigned levels (level 0~4)
+ * @return list of picked runs to compact
+ */
+ public List<ManifestAdjacentSortedRun>
pick(List<ManifestAdjacentSortedRun> levelRuns) {
+ if (levelRuns.isEmpty() || levelRuns.size() <= MAX_LEVEL) {
+ return new ArrayList<>();
+ }
+
+ // Try SizeAmp first
+ List<ManifestAdjacentSortedRun> sizeAmpResult =
pickForSizeAmp(levelRuns);
+ if (sizeAmpResult != null) {
+ return sizeAmpResult;
+ }
+
+ // SizeRatio + forced pick
+ return pickForSizeRatioAndForce(levelRuns);
+ }
+
+ /**
+ * SizeAmp check: if all lower-level (0~3) runs' total size exceeds the
highest-level run's size
+ * by more than {@code sizeAmpThreshold} percent, pick all runs for full
compaction.
+ *
+ * <p>Formula (consistent with {@code
UniversalCompaction#pickForSizeAmp}): {@code
+ * lowerLevelTotalSize * 100 > sizeAmpThreshold * highestRunSize}
+ */
+ private List<ManifestAdjacentSortedRun> pickForSizeAmp(
+ List<ManifestAdjacentSortedRun> levelRuns) {
+ if (levelRuns.isEmpty()) {
+ return null;
+ }
+
+ // The last run has the highest level (set by buildLevelSortedRuns)
+ ManifestAdjacentSortedRun highestRun = levelRuns.get(levelRuns.size()
- 1);
+ int maxLevel = highestRun.level();
+
+ if (maxLevel <= 0) {
+ return null;
+ }
+
+ long lowerLevelTotalSize = 0;
+ for (ManifestAdjacentSortedRun run : levelRuns) {
+ if (run.level() < maxLevel) {
+ lowerLevelTotalSize += run.totalSize();
+ }
+ }
+
+ // size amplification = percentage of additional size
+ if (lowerLevelTotalSize * 100 > (long) sizeAmpThreshold *
highestRun.totalSize()) {
+ return new ArrayList<>(levelRuns);
+ }
+ return null;
+ }
+
+ /**
+ * SizeRatio + forced pick.
+ *
+ * <ul>
+ * <li>Level0 and level1 are always picked.
+ * <li>From low to high, if the cumulative picked size with ratio
amplification covers the
+ * next run's size, continue picking.
+ * </ul>
+ *
+ * <p>Formula (consistent with {@code
UniversalCompaction#pickForSizeRatio}): {@code pickedSize
+ * * (100.0 + sizeRatioThreshold) / 100.0 >= nextRunSize}
+ */
+ private List<ManifestAdjacentSortedRun> pickForSizeRatioAndForce(
+ List<ManifestAdjacentSortedRun> levelRuns) {
+ // levelRuns is already sorted by level ascending (set by
buildLevelSortedRuns)
+ List<ManifestAdjacentSortedRun> picked = new ArrayList<>();
+
+ // Always pick the first run to guarantee a non-empty result.
+ picked.add(levelRuns.get(0));
+ long pickedSize = levelRuns.get(0).totalSize();
+
+ // From the second run onward: forced pick level0/level1, then
SizeRatio for the rest.
+ for (int i = 1; i < levelRuns.size(); i++) {
+ ManifestAdjacentSortedRun run = levelRuns.get(i);
+ if (run.level() <= 1) {
+ picked.add(run);
+ pickedSize += run.totalSize();
+ } else {
+ long nextRunSize = run.totalSize();
+ if (pickedSize * (100 + sizeRatioThreshold) >= nextRunSize *
100L) {
+ picked.add(run);
+ pickedSize += nextRunSize;
+ }
+ }
+ }
+ if (picked.size() == 1) {
+ return new ArrayList<>();
+ }
+ return picked;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index d07ad25819..50228385a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -319,6 +319,8 @@ public class SchemaValidation {
validateChangelogReadSequenceNumber(schema, options);
validatePkClusteringOverride(options);
+
+ validateManifestSort(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager,
TableSchema schema) {
@@ -1032,4 +1034,22 @@ public class SchemaValidation {
}
}
}
+
+ private static void validateManifestSort(TableSchema schema, CoreOptions
options) {
+ if (options.manifestSortEnabled()) {
+ checkArgument(
+ !schema.partitionKeys().isEmpty(),
+ "Cannot enable '%s' for non-partition table.",
+ CoreOptions.MANIFEST_SORT_ENABLED.key());
+ String sortPartitionField = options.manifestSortPartitionField();
+ if (sortPartitionField != null && !sortPartitionField.isEmpty()) {
+ checkArgument(
+ schema.partitionKeys().contains(sortPartitionField),
+ "'%s' = '%s' is not a partition field. Available
partition fields: %s.",
+ CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(),
+ sortPartitionField,
+ schema.partitionKeys());
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 36b0d15f11..75a1ab0a84 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -18,16 +18,26 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.SeekableInputStreamWrapper;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.ManifestFileMerger;
+import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -42,6 +52,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -84,9 +95,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
createData(numLastBits, input, expected);
// no trigger Full Compaction
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size",
"9223372036854775807B");
List<ManifestFileMeta> actual =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, Long.MAX_VALUE,
getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertThat(actual).hasSameSizeAs(expected);
// these two manifest files are merged from the input
@@ -118,14 +136,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
ManifestFile failingManifestFile =
createManifestFile(FailingFileIO.getFailingPath(failingName,
tempDir.toString()));
try {
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set(
+ "manifest.full-compaction-threshold-size",
fullCompactionThreshold + "B");
ManifestFileMerger.merge(
input,
failingManifestFile,
- 500,
- 3,
- fullCompactionThreshold,
getPartitionType(),
- null);
+ CoreOptions.fromMap(testOptions.toMap()));
} catch (Throwable e) {
assertThat(e).hasRootCauseExactlyInstanceOf(FailingFileIO.ArtificialException.class);
// old files should be kept untouched, while new files should be
cleaned up
@@ -156,9 +176,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
// delta with delete apply partition 1,2
addDeltaManifests(input, true);
// trigger full compaction
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(),
null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
// 1st Manifest don't need to Merge
assertSameContent(input.get(0), merged.get(0), manifestFile);
@@ -173,9 +200,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
// base
List<ManifestFileMeta> input = createBaseManifestFileMetas(true);
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(),
null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
assertThat(merged).hasSameElementsAs(input);
@@ -186,9 +220,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
ManifestFileMeta delta = makeManifest(makeEntry(true, "A", 1),
makeEntry(false, "A", 1));
input1.add(delta);
+ Options testOptions1 = new Options();
+ testOptions1.set("manifest.target-file-size", "500B");
+ testOptions1.set("manifest.merge-min-count", "3");
+ testOptions1.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged1 =
ManifestFileMerger.merge(
- input1, manifestFile, 500, 3, 200, getPartitionType(),
null);
+ input1,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions1.toMap()));
assertThat(base).hasSameElementsAs(merged1);
assertEquivalentEntries(input1, merged1);
@@ -198,9 +239,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
public void testMergeWithoutBase() {
List<ManifestFileMeta> input = new ArrayList<>();
addDeltaManifests(input, true);
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(),
null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
}
@@ -225,9 +273,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
input.add(makeManifest(makeEntry(true, "F")));
input.add(makeManifest(makeEntry(true, "G")));
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(),
null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
}
@@ -489,9 +544,16 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
input.add(makeManifest(makeEntry(true, "F")));
input.add(makeManifest(makeEntry(true, "G")));
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", threshold + "B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, threshold, 3, 200,
getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(
input.stream()
.filter(f -> !baseFiles.contains(f.fileName()))
@@ -819,4 +881,621 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
}
}
}
+
+ // ==================== Manifest Sort Tests ====================
+
+ /**
+ * Test manifest sort with overlapping partition ranges. Each manifest
contains entries spanning
+ * multiple partitions, creating overlapping intervals that require sort
rewrite to resolve.
+ * After sort rewrite, all surviving ADD entries should be sorted by
partition field.
+ */
+ @Test
+ public void testManifestSortWithOverlappingPartitions() {
+ List<ManifestFileMeta> input = new ArrayList<>();
+
+ // manifest-A: partitions [5, 13]
+ List<ManifestEntry> entriesA = new ArrayList<>();
+ for (int p = 5; p <= 13; p++) {
+ entriesA.add(makeEntry(true, String.format("A-p%d", p), p));
+ }
+ input.add(makeManifest(entriesA.toArray(new ManifestEntry[0])));
+
+ // manifest-B: partitions [0, 9]
+ List<ManifestEntry> entriesB = new ArrayList<>();
+ for (int p = 0; p <= 9; p++) {
+ entriesB.add(makeEntry(true, String.format("B-p%d", p), p));
+ }
+ input.add(makeManifest(entriesB.toArray(new ManifestEntry[0])));
+
+ // manifest-C: partitions [3, 7] -- overlaps with A and B
+ List<ManifestEntry> entriesC = new ArrayList<>();
+ for (int p = 3; p <= 7; p++) {
+ entriesC.add(makeEntry(true, String.format("C-p%d", p), p));
+ }
+ input.add(makeManifest(entriesC.toArray(new ManifestEntry[0])));
+
+ // manifest-D: partitions [8, 12] -- overlaps with A
+ List<ManifestEntry> entriesD = new ArrayList<>();
+ for (int p = 8; p <= 12; p++) {
+ entriesD.add(makeEntry(true, String.format("D-p%d", p), p));
+ }
+ input.add(makeManifest(entriesD.toArray(new ManifestEntry[0])));
+
+ // manifest-E: partitions [1, 6] -- overlaps with B and C
+ List<ManifestEntry> entriesE = new ArrayList<>();
+ for (int p = 1; p <= 6; p++) {
+ entriesE.add(makeEntry(true, String.format("E-p%d", p), p));
+ }
+ input.add(makeManifest(entriesE.toArray(new ManifestEntry[0])));
+
+ // manifest-F: partitions [4, 14] -- overlaps with D
+ List<ManifestEntry> entriesF = new ArrayList<>();
+ for (int p = 4; p <= 14; p++) {
+ entriesF.add(makeEntry(true, String.format("F-p%d", p), p));
+ }
+ input.add(makeManifest(entriesF.toArray(new ManifestEntry[0])));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Verify entries are equivalent (no data loss)
+ assertEquivalentEntries(input, merged);
+
+ // Verify all entries within each output manifest are sorted by
partition
+ for (ManifestFileMeta meta : merged) {
+ List<ManifestEntry> entries = manifestFile.read(meta.fileName(),
meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevPartition = entries.get(i - 1).partition().getInt(0);
+ int currPartition = entries.get(i).partition().getInt(0);
+ assertThat(currPartition)
+ .as("Entries within a manifest should be sorted by
partition")
+ .isGreaterThanOrEqualTo(prevPartition);
+ }
+ }
+ }
+
+ /**
+ * Test that sort rewrite correctly eliminates DELETE entries and their
corresponding ADD
+ * entries. The key condition is that totalDeltaFileSize must reach
manifestFullCompactionSize
+ * to trigger the full compaction path inside trySortRewrite, which reads
deleteEntries and
+ * passes them to sortAndRewriteSection for elimination.
+ *
+ * <p>Design:
+ *
+ * <pre>
+ * - Base manifests with overlapping partitions (all ADD, large enough
to be "mustChange"
+ * since fileSize < suggestedMetaSize):
+ * manifest-A: partitions [0, 4] with entries A-p0..A-p4
+ * manifest-B: partitions [2, 6] with entries B-p2..B-p6 (overlaps A)
+ * manifest-C: partitions [5, 9] with entries C-p5..C-p9 (overlaps B)
+ * - Delta manifests with DELETE entries (cancel some ADD entries):
+ * manifest-D: DELETE A-p2, DELETE B-p4, ADD new-p2, ADD new-p4
+ * manifest-E: DELETE C-p7, ADD new-p7
+ * - After sort rewrite: A-p2, B-p4, C-p7 should be eliminated,
+ * replaced by new-p2, new-p4, new-p7. Output should only contain ADD
entries,
+ * sorted by partition.
+ * </pre>
+ */
+ @Test
+ public void testManifestSortEliminatesDeleteEntries() {
+ List<ManifestFileMeta> input = new ArrayList<>();
+
+ // manifest-A: partitions [0, 4]
+ List<ManifestEntry> entriesA = new ArrayList<>();
+ for (int p = 0; p <= 4; p++) {
+ entriesA.add(makeEntry(true, String.format("A-p%d", p), p));
+ }
+ input.add(makeManifest(entriesA.toArray(new ManifestEntry[0])));
+
+ // manifest-B: partitions [2, 6] -- overlaps A
+ List<ManifestEntry> entriesB = new ArrayList<>();
+ for (int p = 2; p <= 6; p++) {
+ entriesB.add(makeEntry(true, String.format("B-p%d", p), p));
+ }
+ input.add(makeManifest(entriesB.toArray(new ManifestEntry[0])));
+
+ // manifest-C: partitions [5, 9] -- overlaps B
+ List<ManifestEntry> entriesC = new ArrayList<>();
+ for (int p = 5; p <= 9; p++) {
+ entriesC.add(makeEntry(true, String.format("C-p%d", p), p));
+ }
+ input.add(makeManifest(entriesC.toArray(new ManifestEntry[0])));
+
+ // manifest-D: DELETE A-p2, DELETE B-p4, ADD new-p2, ADD new-p4
+ input.add(
+ makeManifest(
+ makeEntry(false, "A-p2", 2),
+ makeEntry(false, "B-p4", 4),
+ makeEntry(true, "new-p2", 2),
+ makeEntry(true, "new-p4", 4)));
+
+ // manifest-E: DELETE C-p7, ADD new-p7
+ input.add(makeManifest(makeEntry(false, "C-p7", 7), makeEntry(true,
"new-p7", 7)));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ testOptions.set("manifest.full-compaction-threshold-size", "10B");
+
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Collect all output entries
+ List<ManifestEntry> allOutputEntries = new ArrayList<>();
+ for (ManifestFileMeta meta : merged) {
+ allOutputEntries.addAll(manifestFile.read(meta.fileName(),
meta.fileSize()));
+ }
+
+ // Verify: no DELETE entries in output (all DELETE pairs eliminated)
+ long deleteCount =
+ allOutputEntries.stream().filter(e -> e.kind() ==
FileKind.DELETE).count();
+ assertThat(deleteCount).as("Sort rewrite should eliminate all DELETE
entries").isEqualTo(0);
+
+ // Verify: the deleted ADD entries (A-p2, B-p4, C-p7) are NOT in output
+ Set<String> outputFileNames =
+ allOutputEntries.stream().map(e ->
e.file().fileName()).collect(Collectors.toSet());
+ assertThat(outputFileNames).doesNotContain("A-p2", "B-p4", "C-p7");
+
+ // Verify: the replacement entries (new-p2, new-p4, new-p7) ARE in
output
+ assertThat(outputFileNames).contains("new-p2", "new-p4", "new-p7");
+
+ // Verify: all surviving entries match what FileEntry.mergeEntries
would produce
+ assertEquivalentEntries(input, merged);
+
+ // Verify entries within each output manifest are sorted by partition
+ for (ManifestFileMeta meta : merged) {
+ List<ManifestEntry> entries = manifestFile.read(meta.fileName(),
meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevPartition = entries.get(i - 1).partition().getInt(0);
+ int currPartition = entries.get(i).partition().getInt(0);
+ assertThat(currPartition)
+ .as("Entries within manifest should be sorted by
partition")
+ .isGreaterThanOrEqualTo(prevPartition);
+ }
+ }
+ }
+
+ /**
+ * Test manifest sort with a multi-field partition type.
+ *
+ * <p>Setup: partition=(region INT, dt INT, hour INT), sort by dt (field
index=1). 9 manifest
+ * files form 6 overlapping sorted runs by dt range:
+ *
+ * <pre>
+ * Run1: 3 files, dt=[0,15],[3,5],[6,8]
+ * Run2: 2 files, dt=[1,8],[5,7]
+ * Run3: 1 file, dt=[0,9]
+ * Run4: 1 file, dt=[5,14]
+ * Run5: 1 file, dt=[8,15]
+ * Run6: 1 file, dt=[4,12]
+ * </pre>
+ *
+ * <p>Verifies: 1) no data loss after sort-rewrite, 2) entries within each
output manifest are
+ * sorted by dt.
+ */
+ @Test
+ public void testManifestSortWithMultiplePartitions() {
+ // Use a 3-field partition type: (region INT, dt INT, hour INT)
+ RowType multiPartitionType = RowType.of(new IntType(), new IntType(),
new IntType());
+
+ // Create a dedicated ManifestFile for the 3-field partition type
+ Path path = new Path(tempDir.toString());
+ FileIO fileIO = FileIOFinder.find(path);
+ ManifestFile multiPartManifestFile =
+ new ManifestFile.Factory(
+ fileIO,
+ new SchemaManager(fileIO, path),
+ multiPartitionType,
+ avro,
+ "zstd",
+ new FileStorePathFactory(
+ path,
+ multiPartitionType,
+ "default",
+ CoreOptions.FILE_FORMAT.defaultValue(),
+
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+
CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+
CoreOptions.FILE_COMPRESSION.defaultValue(),
+ null,
+ null,
+ CoreOptions.ExternalPathStrategy.NONE,
+ null,
+ false,
+ null),
+ Long.MAX_VALUE,
+ null)
+ .create();
+
+ List<ManifestFileMeta> input = new ArrayList<>();
+
+ // Run1
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r1a-p0", 10,
0, 1),
+ makeMultiPartEntry(true, "r1a-p1", 20,
1, 2),
+ makeMultiPartEntry(true, "r1a-p2", 30,
15, 3)))
+ .get(0));
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r1b-p3", 10,
3, 4),
+ makeMultiPartEntry(true, "r1b-p4", 20,
4, 5),
+ makeMultiPartEntry(true, "r1b-p5", 30,
5, 6)))
+ .get(0));
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r1c-p6", 10,
6, 7),
+ makeMultiPartEntry(true, "r1c-p7", 20,
7, 8),
+ makeMultiPartEntry(true, "r1c-p8", 30,
8, 9)))
+ .get(0));
+
+ // Run2
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r2a-p1", 5,
1, 10),
+ makeMultiPartEntry(true, "r2a-p2", 15,
2, 11),
+ makeMultiPartEntry(true, "r2a-p3", 25,
3, 12),
+ makeMultiPartEntry(true, "r2a-p4", 35,
8, 13)))
+ .get(0));
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r2b-p5", 5,
5, 14),
+ makeMultiPartEntry(true, "r2b-p6", 15,
6, 15),
+ makeMultiPartEntry(true, "r2b-p7", 25,
7, 16)))
+ .get(0));
+
+ // Run3
+ List<ManifestEntry> run3Entries = new ArrayList<>();
+ for (int p = 0; p <= 9; p++) {
+ run3Entries.add(makeMultiPartEntry(true, String.format("r3-p%d",
p), 99, p, p + 20));
+ }
+ input.add(multiPartManifestFile.write(run3Entries).get(0));
+
+ // Run4
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r4a-p10",
10, 5, 30),
+ makeMultiPartEntry(true, "r4a-p11",
20, 11, 31),
+ makeMultiPartEntry(true, "r4a-p12",
30, 12, 32),
+ makeMultiPartEntry(true, "r4a-p13",
40, 13, 33),
+ makeMultiPartEntry(true, "r4a-p14",
50, 14, 34)))
+ .get(0));
+
+ // Run5
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r5a-p11",
11, 8, 40),
+ makeMultiPartEntry(true, "r5a-p12",
21, 12, 41),
+ makeMultiPartEntry(true, "r5a-p13",
31, 13, 42),
+ makeMultiPartEntry(true, "r5a-p14",
41, 14, 43),
+ makeMultiPartEntry(true, "r5a-p15",
51, 15, 44)))
+ .get(0));
+
+ // Run6
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r6a-p7", 7,
4, 50),
+ makeMultiPartEntry(true, "r6a-p8", 17,
8, 51),
+ makeMultiPartEntry(true, "r6a-p9", 27,
9, 52),
+ makeMultiPartEntry(true, "r6a-p10",
37, 10, 53),
+ makeMultiPartEntry(true, "r6a-p11",
47, 11, 54),
+ makeMultiPartEntry(true, "r6a-p12",
57, 12, 55)))
+ .get(0));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ // Sort by the second partition field "f1" (dt)
+ testOptions.set("manifest-sort.partition-field", "f1");
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input,
+ multiPartManifestFile,
+ multiPartitionType,
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Verify no data loss
+ List<ManifestEntry> inputEntries =
+ input.stream()
+ .flatMap(
+ f ->
+
multiPartManifestFile.read(f.fileName(), f.fileSize())
+ .stream())
+ .collect(Collectors.toList());
+ List<String> entryBeforeMerge =
+ FileEntry.mergeEntries(inputEntries).stream()
+ .filter(entry -> entry.kind() == FileKind.ADD)
+ .map(entry -> entry.kind() + "-" +
entry.file().fileName())
+ .collect(Collectors.toList());
+ List<String> entryAfterMerge = new ArrayList<>();
+ for (ManifestFileMeta meta : merged) {
+ for (ManifestEntry entry :
+ multiPartManifestFile.read(meta.fileName(),
meta.fileSize())) {
+ entryAfterMerge.add(entry.kind() + "-" +
entry.file().fileName());
+ }
+ }
+ assertThat(entryBeforeMerge).hasSameElementsAs(entryAfterMerge);
+
+ // Verify entries within each output manifest are sorted by the second
field (dt)
+ for (ManifestFileMeta meta : merged) {
+ List<ManifestEntry> entries =
+ multiPartManifestFile.read(meta.fileName(),
meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevDt = entries.get(i - 1).partition().getInt(1);
+ int currDt = entries.get(i).partition().getInt(1);
+ assertThat(currDt)
+ .as("Entries within manifest should be sorted by
partition")
+ .isGreaterThanOrEqualTo(prevDt);
+ }
+ }
+ }
+
+ /**
+ * Test that when manifest-sort.max-rewrite-size budget is exceeded in the
middle of a section,
+ * the remaining files are appended to the tail and the final manifest
order is preserved.
+ *
+ * <p>Design:
+ *
+ * <pre>
+ * - Create a large section with overlapping partition ranges that
exceeds the budget
+ * - Set a small manifest-sort.max-rewrite-size to force budget split
+ * - Verify that after merge, all manifests are globally sorted by
partition field
+ * - Verify that entries are equivalent (no data loss)
+ * </pre>
+ */
+ @Test
+ public void testManifestSortBudgetSplitPreservesOrder() {
+ // Create manifests with overlapping ranges, large enough to exceed
budget
+ List<ManifestFileMeta> input = new ArrayList<>();
+
+ // Manifest A: partitions [0, 10] - large size
+ List<ManifestEntry> entriesA = new ArrayList<>();
+ for (int p = 0; p <= 10; p++) {
+ entriesA.add(makeEntry(true, String.format("A-p%d", p), p));
+ }
+ ManifestFileMeta manifestA = makeManifest(entriesA.toArray(new
ManifestEntry[0]));
+ // Manually increase file size to simulate large manifest
+ input.add(
+ new ManifestFileMeta(
+ manifestA.fileName(),
+ 100,
+ manifestA.numAddedFiles(),
+ manifestA.numDeletedFiles(),
+ manifestA.partitionStats(),
+ manifestA.schemaId(),
+ manifestA.minBucket(),
+ manifestA.maxBucket(),
+ manifestA.minLevel(),
+ manifestA.maxLevel(),
+ manifestA.minRowId(),
+ manifestA.maxRowId()));
+
+ // Manifest B: partitions [5, 15] - overlaps with A
+ List<ManifestEntry> entriesB = new ArrayList<>();
+ for (int p = 5; p <= 15; p++) {
+ entriesB.add(makeEntry(true, String.format("B-p%d", p), p));
+ }
+ ManifestFileMeta manifestB = makeManifest(entriesB.toArray(new
ManifestEntry[0]));
+ input.add(
+ new ManifestFileMeta(
+ manifestB.fileName(),
+ 100,
+ manifestB.numAddedFiles(),
+ manifestB.numDeletedFiles(),
+ manifestB.partitionStats(),
+ manifestB.schemaId(),
+ manifestB.minBucket(),
+ manifestB.maxBucket(),
+ manifestB.minLevel(),
+ manifestB.maxLevel(),
+ manifestB.minRowId(),
+ manifestB.maxRowId()));
+
+ // Manifest C: partitions [10, 20] - overlaps with B
+ List<ManifestEntry> entriesC = new ArrayList<>();
+ for (int p = 10; p <= 20; p++) {
+ entriesC.add(makeEntry(true, String.format("C-p%d", p), p));
+ }
+ ManifestFileMeta manifestC = makeManifest(entriesC.toArray(new
ManifestEntry[0]));
+ input.add(
+ new ManifestFileMeta(
+ manifestC.fileName(),
+ 100,
+ manifestC.numAddedFiles(),
+ manifestC.numDeletedFiles(),
+ manifestC.partitionStats(),
+ manifestC.schemaId(),
+ manifestC.minBucket(),
+ manifestC.maxBucket(),
+ manifestC.minLevel(),
+ manifestC.maxLevel(),
+ manifestC.minRowId(),
+ manifestC.maxRowId()));
+
+ // Set small budget to force split
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ testOptions.set("manifest-sort.max-rewrite-size", "150B"); // Total
input size is 300B
+
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Verify entries are equivalent
+ assertEquivalentEntries(input, merged);
+
+ // Verify global ordering: all manifests sorted by partition min value
+ for (int i = 1; i < merged.size(); i++) {
+ BinaryRow prevMin = merged.get(i - 1).partitionStats().minValues();
+ BinaryRow currMin = merged.get(i).partitionStats().minValues();
+ assertThat(currMin.getInt(0))
+ .as("Manifests should be globally sorted by partition
field")
+ .isGreaterThanOrEqualTo(prevMin.getInt(0));
+ }
+
+ // Verify entries within each manifest are sorted
+ for (ManifestFileMeta meta : merged) {
+ List<ManifestEntry> entries = manifestFile.read(meta.fileName(),
meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevPartition = entries.get(i - 1).partition().getInt(0);
+ int currPartition = entries.get(i).partition().getInt(0);
+ assertThat(currPartition)
+ .as("Entries within manifest should be sorted by
partition")
+ .isGreaterThanOrEqualTo(prevPartition);
+ }
+ }
+ }
+
+ /**
+ * Test boundary equality (min == previous.max) handling in both SortedRun
construction and
+ * Section splitting. Boundary-touching files should be allowed in the
same SortedRun but may be
+ * separated into different Sections.
+ *
+ * <p>Design:
+ *
+ * <pre>
+ * - Create manifests with boundary-touching partition ranges
+ * - Manifest A: [0, 5]
+ * - Manifest B: [5, 10] (min == A.max, boundary touching)
+ * - Manifest C: [10, 15] (min == B.max, boundary touching)
+ * - Verify they can be in the same SortedRun (>= comparison)
+ * - Verify they may be split into different Sections (>= comparison
with comment)
+ * </pre>
+ */
+ @Test
+ public void testBoundaryEqualityHandling() {
+ List<ManifestFileMeta> input = new ArrayList<>();
+
+ // Manifest A: partitions [0, 5]
+ List<ManifestEntry> entriesA = new ArrayList<>();
+ for (int p = 0; p <= 5; p++) {
+ entriesA.add(makeEntry(true, String.format("A-p%d", p), p));
+ }
+ input.add(makeManifest(entriesA.toArray(new ManifestEntry[0])));
+
+ // Manifest B: partitions [5, 10] - boundary touches A (min == A.max)
+ List<ManifestEntry> entriesB = new ArrayList<>();
+ for (int p = 5; p <= 10; p++) {
+ entriesB.add(makeEntry(true, String.format("B-p%d", p), p));
+ }
+ input.add(makeManifest(entriesB.toArray(new ManifestEntry[0])));
+
+ // Manifest C: partitions [10, 15] - boundary touches B (min == B.max)
+ List<ManifestEntry> entriesC = new ArrayList<>();
+ for (int p = 10; p <= 15; p++) {
+ entriesC.add(makeEntry(true, String.format("C-p%d", p), p));
+ }
+ input.add(makeManifest(entriesC.toArray(new ManifestEntry[0])));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Verify entries are equivalent
+ assertEquivalentEntries(input, merged);
+
+ // Verify all manifests maintain global sort order
+ for (int i = 1; i < merged.size(); i++) {
+ BinaryRow prevMin = merged.get(i - 1).partitionStats().minValues();
+ BinaryRow prevMax = merged.get(i - 1).partitionStats().maxValues();
+ BinaryRow currMin = merged.get(i).partitionStats().minValues();
+
+ // Boundary-touching is allowed: currMin >= prevMin
+ assertThat(currMin.getInt(0))
+ .as("Global order should be maintained with
boundary-touching allowed")
+ .isGreaterThanOrEqualTo(prevMin.getInt(0));
+
+ // Log boundary equality cases for documentation
+ if (currMin.getInt(0) == prevMax.getInt(0)) {
+ System.out.println(
+ String.format(
+ "Boundary equality detected:
manifest[%d].min=%d == manifest[%d].max=%d",
+ i, currMin.getInt(0), i - 1,
prevMax.getInt(0)));
+ }
+ }
+
+ // Verify entries within each manifest are sorted
+ for (ManifestFileMeta meta : merged) {
+ List<ManifestEntry> entries = manifestFile.read(meta.fileName(),
meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevPartition = entries.get(i - 1).partition().getInt(0);
+ int currPartition = entries.get(i).partition().getInt(0);
+ assertThat(currPartition)
+ .as("Entries within manifest should be sorted by
partition")
+ .isGreaterThanOrEqualTo(prevPartition);
+ }
+ }
+ }
+
+ /** Create a ManifestEntry with a 3-field partition row (region, dt,
hour). */
+ private ManifestEntry makeMultiPartEntry(
+ boolean isAdd, String fileName, int region, int dt, int hour) {
+ BinaryRow binaryRow = new BinaryRow(3);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ writer.writeInt(0, region);
+ writer.writeInt(1, dt);
+ writer.writeInt(2, hour);
+ writer.complete();
+
+ return ManifestEntry.create(
+ isAdd ? FileKind.ADD : FileKind.DELETE,
+ binaryRow,
+ 0,
+ 0,
+ DataFileMeta.create(
+ fileName,
+ 0,
+ 0,
+ binaryRow,
+ binaryRow,
+ StatsTestUtils.newEmptySimpleStats(),
+ StatsTestUtils.newEmptySimpleStats(),
+ 0,
+ 0,
+ 0,
+ 0,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(200000),
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ null,
+ null));
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
index 591b320651..66465f1e75 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
@@ -18,7 +18,9 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ManifestFileMerger;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeEach;
@@ -49,9 +51,16 @@ public class NoPartitionManifestFileMetaTest extends
ManifestFileMetaTestBase {
List<ManifestFileMeta> input = createBaseManifestFileMetas(false);
addDeltaManifests(input, false);
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(),
null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
// the first one is not deleted, it should not be merged
@@ -89,9 +98,16 @@ public class NoPartitionManifestFileMetaTest extends
ManifestFileMetaTestBase {
input.add(makeManifest(makeEntry(true, "F", null)));
input.add(makeManifest(makeEntry(true, "G", null)));
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", threshold + "B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List<ManifestFileMeta> merged =
ManifestFileMerger.merge(
- input, manifestFile, threshold, 3, 200,
getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(
input.stream()
.filter(f -> !baseFiles.contains(f.fileName()))
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 14652ec883..beb4bfd376 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -511,6 +511,68 @@ class SchemaValidationTest {
new TableSchema(1, fields, 10, emptyList(),
singletonList("k"), options, ""));
}
+ @Test
+ void testManifestSortValidation() {
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()));
+
+ // Test 1: manifest-sort.enabled on non-partition table should fail
+ Map<String, String> options1 = new HashMap<>();
+ options1.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true");
+ options1.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options1,
+ "")))
+ .hasMessageContaining(
+ "Cannot enable 'manifest-sort.enabled' for
non-partition table.");
+
+ // Test 2: manifest-sort-partition-field not in partition keys should
fail
+ Map<String, String> options2 = new HashMap<>();
+ options2.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true");
+ options2.put(CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), "f1");
+ options2.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ singletonList("f0"),
+ emptyList(),
+ options2,
+ "")))
+ .hasMessageContaining("is not a partition field");
+
+ // Test 3: valid manifest-sort config should pass
+ Map<String, String> options3 = new HashMap<>();
+ options3.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true");
+ options3.put(CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), "f0");
+ options3.put(BUCKET.key(), String.valueOf(-1));
+ assertThatNoException()
+ .isThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ singletonList("f0"),
+ emptyList(),
+ options3,
+ "")));
+ }
+
@Test
public void testMergeOnReadCoexistsWithVisibilityCallback() {
Map<String, String> options = new HashMap<>();