This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 48f9e55d1d [core] Minor refactor for manifest sort code (#8119)
48f9e55d1d is described below
commit 48f9e55d1d647fc821e9624af6a1af2153260cf6
Author: umi <[email protected]>
AuthorDate: Mon Jun 8 19:40:06 2026 +0800
[core] Minor refactor for manifest sort code (#8119)
---
.../paimon/operation/ManifestFileSorter.java | 365 +++++++++++++--------
.../paimon/manifest/ManifestFileMetaTest.java | 2 +-
2 files changed, 229 insertions(+), 138 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
index 39ef0bab52..bbdf5e14bc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java
@@ -66,7 +66,16 @@ public class ManifestFileSorter {
final boolean fullCompaction;
final RecordComparator fieldComparator;
final Set<FileEntry.Identifier> deleteEntries;
- final Map<ManifestFileMeta, Boolean> defaultCompactionMap;
+ /**
+ * Manifest files that need unsorted compaction.
+ *
+ * <p>Key: manifest file metadata
+ *
+ * <p>Value: true if fullCompaction is true and the file overlaps with
delete partitions. It
+ * means the file needs to eliminate delete entries file
+ */
+ final Map<ManifestFileMeta, Boolean> compactWithoutSort;
+
final List<ManifestAdjacentSortedRun> levelRuns;
final List<ManifestAdjacentSortedRun> pickedRuns;
@@ -74,31 +83,44 @@ public class ManifestFileSorter {
boolean fullCompaction,
RecordComparator fieldComparator,
Set<FileEntry.Identifier> deleteEntries,
- Map<ManifestFileMeta, Boolean> defaultCompactionMap,
+ Map<ManifestFileMeta, Boolean> compactWithoutSort,
List<ManifestAdjacentSortedRun> levelRuns,
List<ManifestAdjacentSortedRun> pickedRuns) {
this.fullCompaction = fullCompaction;
this.fieldComparator = fieldComparator;
this.deleteEntries = deleteEntries;
- this.defaultCompactionMap = defaultCompactionMap;
+ this.compactWithoutSort = compactWithoutSort;
this.levelRuns = levelRuns;
this.pickedRuns = pickedRuns;
}
+
+ /** Check whether the given manifest file is marked for unsorted
compaction. */
+ boolean isMarkedForUnsortedCompaction(ManifestFileMeta file) {
+ return compactWithoutSort.containsKey(file);
+ }
}
/** Result of classifying manifest files. */
private static class ClassifyResult {
final List<ManifestFileMeta> lsmFiles;
final Set<FileEntry.Identifier> deleteEntries;
- final Map<ManifestFileMeta, Boolean> defaultCompactionMap;
+ /**
+ * Manifest files that need unsorted compaction.
+ *
+ * <p>Key: manifest file metadata
+ *
+ * <p>Value: true if fullCompaction is true and the file overlaps with
delete partitions. It
+ * means the file needs to eliminate delete entries file
+ */
+ final Map<ManifestFileMeta, Boolean> compactWithoutSort;
ClassifyResult(
List<ManifestFileMeta> lsmFiles,
Set<FileEntry.Identifier> deleteEntries,
- Map<ManifestFileMeta, Boolean> defaultCompactionMap) {
+ Map<ManifestFileMeta, Boolean> compactWithoutSort) {
this.lsmFiles = lsmFiles;
this.deleteEntries = deleteEntries;
- this.defaultCompactionMap = defaultCompactionMap;
+ this.compactWithoutSort = compactWithoutSort;
}
}
@@ -159,7 +181,7 @@ public class ManifestFileSorter {
/**
* Full compaction path: totalDeltaFileSize >= sizeTrigger.
*
- * <p>Does not build index mapping. sortAndRewriteSection writes all
entries (ADD+DELETE merged)
+ * <p>Does not build index mapping. rewriteSection writes all entries
(ADD+DELETE merged)
* together without separating them.
*/
private static Optional<List<ManifestFileMeta>> tryFullCompaction(
@@ -201,19 +223,19 @@ public class ManifestFileSorter {
List<ManifestAdjacentSortedRun> levelRuns = ctx.levelRuns;
List<ManifestAdjacentSortedRun> pickedRuns = ctx.pickedRuns;
- if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) {
+ if (pickedRuns.isEmpty() && ctx.compactWithoutSort.isEmpty()) {
LOG.debug(
- "Manifest sort full compact skipped: no runs picked and no
defaultCompaction files.");
+ "Manifest sort full compact skipped: no runs picked and no
compactWithoutSort files.");
return Optional.empty();
}
LOG.info(
"Manifest sort full compact: input={} files, lsm={} runs,
picked={} runs, "
- + "defaultCompaction={} files.",
+ + "compactWithoutSort={} files.",
input.size(),
levelRuns.size(),
pickedRuns.size(),
- ctx.defaultCompactionMap.size());
+ ctx.compactWithoutSort.size());
// Step 3: Collect reused files (not picked) and picked files
Set<ManifestAdjacentSortedRun> pickedSet = new HashSet<>(pickedRuns);
@@ -227,11 +249,10 @@ public class ManifestFileSorter {
for (ManifestAdjacentSortedRun run : pickedRuns) {
pickedFiles.addAll(run.files());
}
- pickedFiles.addAll(ctx.defaultCompactionMap.keySet());
+ pickedFiles.addAll(ctx.compactWithoutSort.keySet());
// Step 4: Split into sections and merge small adjacent sections
- List<Section> sections =
- splitIntoSections(pickedFiles, ctx.fieldComparator,
ctx.defaultCompactionMap);
+ List<Section> sections = splitIntoSections(pickedFiles, ctx);
sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
LOG.info(
@@ -262,8 +283,8 @@ public class ManifestFileSorter {
/**
* Minor compaction path: totalDeltaFileSize < sizeTrigger.
*
- * <p>Builds index mapping to preserve original positions.
sortAndRewriteSection separates ADD
- * and DELETE entries, placing ADD at result[minIdx] and DELETE at
result[maxIdx].
+ * <p>Builds index mapping to preserve original positions. rewriteSection
separates ADD and
+ * DELETE entries, placing ADD at result[minIdx] and DELETE at
result[maxIdx].
*/
private static List<ManifestFileMeta> tryMinorCompaction(
List<ManifestFileMeta> input,
@@ -293,19 +314,19 @@ public class ManifestFileSorter {
List<ManifestAdjacentSortedRun> levelRuns = ctx.levelRuns;
List<ManifestAdjacentSortedRun> pickedRuns = ctx.pickedRuns;
- if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) {
+ if (pickedRuns.isEmpty() && ctx.compactWithoutSort.isEmpty()) {
LOG.debug(
- "Manifest sort minor compact skipped: no runs picked and
no defaultCompaction files.");
+ "Manifest sort minor compact skipped: no runs picked and
no compactWithoutSort files.");
return input;
}
LOG.info(
"Manifest sort minor compact: input={} files, lsm={} runs,
picked={} runs, "
- + "defaultCompaction={} files.",
+ + "compactWithoutSort={} files.",
input.size(),
levelRuns.size(),
pickedRuns.size(),
- ctx.defaultCompactionMap.size());
+ ctx.compactWithoutSort.size());
// Step 2: Build fileName -> index mapping and initialize 2D result
Map<String, Integer> fileNameToIndex = new HashMap<>();
@@ -332,7 +353,7 @@ public class ManifestFileSorter {
for (ManifestAdjacentSortedRun run : pickedRuns) {
pickedFiles.addAll(run.files());
}
- pickedFiles.addAll(ctx.defaultCompactionMap.keySet());
+ pickedFiles.addAll(ctx.compactWithoutSort.keySet());
// Step 4: Compute index range
int minIdx = Integer.MAX_VALUE;
@@ -347,8 +368,7 @@ public class ManifestFileSorter {
Pair<Integer, Integer> indexRange = Pair.of(minIdx, maxIdx);
// Step 5: Split into sections and merge small adjacent sections
- List<Section> sections =
- splitIntoSections(pickedFiles, ctx.fieldComparator,
ctx.defaultCompactionMap);
+ List<Section> sections = splitIntoSections(pickedFiles, ctx);
sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
LOG.info(
@@ -436,7 +456,7 @@ public class ManifestFileSorter {
fullCompaction,
fieldComparator,
classifyResult.deleteEntries,
- classifyResult.defaultCompactionMap,
+ classifyResult.compactWithoutSort,
levelRuns,
pickedRuns);
}
@@ -445,12 +465,12 @@ public class ManifestFileSorter {
* Classify manifest files into default-compaction group and LSM group.
*
* <p>Full compaction: small files and files overlapping delete partitions
go into
- * defaultCompactionMap; the rest are returned as lsmFiles.
+ * compactWithoutSort; the rest are returned as lsmFiles.
*
- * <p>Non-full compaction: small files go to defaultCompactionMap for
minor-style merge; the
- * rest are returned as lsmFiles.
+ * <p>Non-full compaction: small files go to compactWithoutSort for
minor-style merge; the rest
+ * are returned as lsmFiles.
*
- * @return ClassifyResult containing lsmFiles, deleteEntries, and
defaultCompactionMap
+ * @return ClassifyResult containing lsmFiles, deleteEntries, and
compactWithoutSort
*/
private static ClassifyResult classifyManifests(
List<ManifestFileMeta> input,
@@ -460,7 +480,7 @@ public class ManifestFileSorter {
long suggestedMetaSize,
@Nullable Integer manifestReadParallelism) {
// Initialize classification containers and read delete entries
- Map<ManifestFileMeta, Boolean> classifiedDefaultMap = new
LinkedHashMap<>();
+ Map<ManifestFileMeta, Boolean> compactWithoutSort = new
LinkedHashMap<>();
List<ManifestFileMeta> lsmFiles = new LinkedList<>(input);
Set<FileEntry.Identifier> classifiedDeleteEntries =
Collections.emptySet();
PartitionPredicate predicate = null;
@@ -496,11 +516,11 @@ public class ManifestFileSorter {
file.partitionStats().nullCounts());
if (small || inDeleteRange) {
iterator.remove();
- classifiedDefaultMap.put(file, inDeleteRange);
+ compactWithoutSort.put(file, inDeleteRange);
}
}
- return new ClassifyResult(lsmFiles, classifiedDeleteEntries,
classifiedDefaultMap);
+ return new ClassifyResult(lsmFiles, classifiedDeleteEntries,
compactWithoutSort);
}
/**
@@ -588,12 +608,11 @@ public class ManifestFileSorter {
/**
* Split picked files into sections. Files with overlapping sort-key
intervals go into the same
- * section. Each section is built with pre-computed totalSize and
hasDefaultCompactMeta.
+ * section. Each section is built with pre-computed totalSize and
hasUnsortedCompactMeta.
*/
static List<Section> splitIntoSections(
- List<ManifestFileMeta> pickedFiles,
- RecordComparator fieldComparator,
- Map<ManifestFileMeta, Boolean> defaultCompactionMap) {
+ List<ManifestFileMeta> pickedFiles, CompactionContext ctx) {
+ RecordComparator fieldComparator = ctx.fieldComparator;
pickedFiles.sort(
(a, b) -> {
int cmp =
@@ -607,13 +626,13 @@ public class ManifestFileSorter {
});
List<Section> sections = new ArrayList<>();
- List<ManifestFileMeta> currentFiles = new ArrayList<>();
- long currentTotalSize = 0;
- boolean currentHasDefault = false;
+ List<ManifestFileMeta> currentSectionFiles = new ArrayList<>();
+ long currentSectionTotalSize = 0;
ManifestFileMeta first = pickedFiles.get(0);
- currentFiles.add(first);
- currentTotalSize += first.fileSize();
- currentHasDefault = defaultCompactionMap.containsKey(first);
+
+ currentSectionFiles.add(first);
+ currentSectionTotalSize += first.fileSize();
+ boolean currentSectionHasUnsortedCompactMeta =
ctx.isMarkedForUnsortedCompaction(first);
BinaryRow sectionMaxBound = first.partitionStats().maxValues();
for (int i = 1; i < pickedFiles.size(); i++) {
@@ -633,18 +652,24 @@ public class ManifestFileSorter {
// they may be placed in the same SortedRun during
buildLevelSortedRuns (which uses >= 0
// comparison). This dual behavior is intentional and documented
in class comments.
if (fieldComparator.compare(file.partitionStats().minValues(),
sectionMaxBound) >= 0) {
- sections.add(new Section(currentFiles, currentTotalSize,
currentHasDefault));
- currentFiles = new ArrayList<>();
- currentTotalSize = 0;
- currentFiles.add(file);
- currentTotalSize += file.fileSize();
- currentHasDefault = defaultCompactionMap.containsKey(file);
+ sections.add(
+ new Section(
+ currentSectionFiles,
+ currentSectionTotalSize,
+ currentSectionHasUnsortedCompactMeta));
+ // start a new section
+ currentSectionFiles = new ArrayList<>();
+ currentSectionTotalSize = 0;
+ currentSectionFiles.add(file);
+ currentSectionTotalSize += file.fileSize();
+ currentSectionHasUnsortedCompactMeta =
ctx.isMarkedForUnsortedCompaction(file);
sectionMaxBound = file.partitionStats().maxValues();
} else {
- currentFiles.add(file);
- currentTotalSize += file.fileSize();
- if (!currentHasDefault &&
defaultCompactionMap.containsKey(file)) {
- currentHasDefault = true;
+ currentSectionFiles.add(file);
+ currentSectionTotalSize += file.fileSize();
+ if (!currentSectionHasUnsortedCompactMeta
+ && ctx.isMarkedForUnsortedCompaction(file)) {
+ currentSectionHasUnsortedCompactMeta = true;
}
if (fieldComparator.compare(file.partitionStats().maxValues(),
sectionMaxBound)
> 0) {
@@ -652,7 +677,11 @@ public class ManifestFileSorter {
}
}
}
- sections.add(new Section(currentFiles, currentTotalSize,
currentHasDefault));
+ sections.add(
+ new Section(
+ currentSectionFiles,
+ currentSectionTotalSize,
+ currentSectionHasUnsortedCompactMeta));
return sections;
}
@@ -695,14 +724,14 @@ public class ManifestFileSorter {
* <li>First overflow: The current section is split. The rewritable part
is sorted and
* rewritten. The remaining part is appended back to the sections
queue for later
* processing.
- * <li>Subsequent overflows: If the section has files in
defaultCompactionMap (needs default
- * compaction), rewriteSubSegments is called to process it in
smaller chunks. Otherwise,
- * the section is skipped.
+ * <li>Subsequent overflows: If the section has files in
compactWithoutSort (needs unsorted
+ * compaction), unsortedCompactSection is called to process it in
smaller chunks.
+ * Otherwise, the section is skipped.
* </ul>
*
* <p>This design ensures that the budget only limits the aggressive sort
rewrite, while still
* allowing necessary cleanup operations (delete entry elimination, small
file merge) through
- * the rewriteSubSegments fallback path.
+ * the unsortedCompactSection fallback path.
*/
private static void rewriteSections(
List<Section> sections,
@@ -715,13 +744,16 @@ public class ManifestFileSorter {
long maxRewriteSize,
@Nullable Integer manifestReadParallelism)
throws Exception {
- long processedSize = 0;
- boolean reachedLimit = false;
+ // Total data size that has been sort-rewritten so far, used to
enforce maxRewriteSize.
+ long currentRewrittenSize = 0;
+ boolean budgetExhausted = false; // Whether currentRewrittenSize
reaches maxRewriteSize.
for (int i = 0; i < sections.size(); i++) {
Section section = sections.get(i);
+
+ // A single-file section is always handled directly, regardless of
the budget.
if (section.files.size() == 1) {
- sortAndRewriteSection(
+ rewriteSection(
section.files,
output,
sortNewFiles,
@@ -731,55 +763,45 @@ public class ManifestFileSorter {
continue;
}
- if (processedSize + section.totalSize <= maxRewriteSize) {
- processedSize += section.totalSize;
- sortAndRewriteSection(
- section.files,
- output,
- sortNewFiles,
- ctx,
- manifestFile,
- manifestReadParallelism);
- } else if (!reachedLimit) {
- long rewriteTotalSize = maxRewriteSize - processedSize;
- processedSize += section.totalSize;
- List<ManifestFileMeta> rewriteFiles = new ArrayList<>();
- List<ManifestFileMeta> remainingFiles = new ArrayList<>();
- long rewriteSize = 0;
- long remainingSize = 0;
- boolean remainingHasDefault = false;
-
- for (ManifestFileMeta file : section.files) {
- if (rewriteSize + file.fileSize() <= rewriteTotalSize) {
- rewriteFiles.add(file);
- rewriteSize += file.fileSize();
- } else {
- remainingFiles.add(file);
- remainingSize += file.fileSize();
- if (ctx.defaultCompactionMap.containsKey(file)) {
- remainingHasDefault = true;
- }
+ // Phase 1: budget not yet exhausted -- perform aggressive sort
rewrite.
+ if (!budgetExhausted) {
+ // Phase 1a: section fits within the remaining budget -- sort
and rewrite it
+ // wholly.
+ if (currentRewrittenSize + section.totalSize <=
maxRewriteSize) {
+ currentRewrittenSize += section.totalSize;
+ rewriteSection(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ } else {
+ // Phase 1b: first overflow -- split the section at the
budget boundary,
+ // rewrite the affordable head, and append the remaining
tail back for later
+ // (Phase 2) handling.
+ long remainingBudget = maxRewriteSize -
currentRewrittenSize;
+ currentRewrittenSize += section.totalSize;
+ Section remaining =
+ splitSectionAndRewriteHead(
+ section,
+ remainingBudget,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ if (remaining != null) {
+ // global ManifestMeta section order by sort key is
not a required invariant
+ sections.add(remaining);
}
+ budgetExhausted = true;
}
-
- sortAndRewriteSection(
- rewriteFiles,
- output,
- sortNewFiles,
- ctx,
- manifestFile,
- manifestReadParallelism);
-
- if (!remainingFiles.isEmpty()) {
- Section remainingSection =
- new Section(remainingFiles, remainingSize,
remainingHasDefault);
- // global manifest file metas order by sort key is not a
required invariant
- sections.add(remainingSection);
- }
- reachedLimit = true;
- } else if (section.hasDefaultCompactMeta) {
- rewriteSubSegments(
- section.files,
+ } else {
+ // Phase 2: budget already exhausted -- only do unsorted
compact, skip aggressive
+ // sort rewrite.
+ rewriteSectionBeyondBudget(
+ section,
output,
sortNewFiles,
ctx,
@@ -787,10 +809,81 @@ public class ManifestFileSorter {
suggestedMetaSize,
suggestedMinMetaCount,
manifestReadParallelism);
+ }
+ }
+ }
+
+ /**
+ * Split a section at the rewrite budget boundary: sort and rewrite the
head part that fits
+ * within the remaining budget, and return the remaining tail as a new
Section (or null if the
+ * whole section fits and no tail is left).
+ */
+ private static Section splitSectionAndRewriteHead(
+ Section section,
+ long remainingBudget,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ List<ManifestFileMeta> headFiles = new ArrayList<>();
+ List<ManifestFileMeta> tailFiles = new ArrayList<>();
+ long headSize = 0;
+ long tailSize = 0;
+ // Whether tail section has files in compactWithoutSort, if true, the
section need to
+ // be rewritten.
+ boolean tailHasUnsortedCompactMeta = false;
+
+ for (ManifestFileMeta file : section.files) {
+ if (headSize + file.fileSize() <= remainingBudget) {
+ headFiles.add(file);
+ headSize += file.fileSize();
} else {
- output.addAllUnchanged(section.files);
+ tailFiles.add(file);
+ tailSize += file.fileSize();
+ if (ctx.isMarkedForUnsortedCompaction(file)) {
+ tailHasUnsortedCompactMeta = true;
+ }
}
}
+
+ rewriteSection(headFiles, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
+
+ if (tailFiles.isEmpty()) {
+ return null;
+ }
+ return new Section(tailFiles, tailSize, tailHasUnsortedCompactMeta);
+ }
+
+ /**
+ * Handle a section after the sort rewrite budget is exhausted. Sections
that contain
+ * default-compaction files (small files / delete entries) still go through
+ * unsortedCompactSection for necessary cleanup; otherwise they are kept
unchanged.
+ */
+ private static void rewriteSectionBeyondBudget(
+ Section section,
+ RewriteOutput output,
+ List<ManifestFileMeta> sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ if (section.hasUnsortedCompactMeta) {
+ unsortedCompactSection(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ manifestReadParallelism);
+ } else {
+ output.addAllUnchanged(section.files);
+ }
}
/**
@@ -798,8 +891,8 @@ public class ManifestFileSorter {
*
* <p><b>Semantics difference from old minor merge:</b> In the old
ManifestFileMerger path, the
* trailing candidates are kept unchanged when their count is below
manifest.merge-min-count. In
- * this sort path, rewriteSubSegments is triggered when
defaultCompactionMap is non-empty,
- * regardless of the manifest count. This is because files in
defaultCompactionMap either:
+ * this sort path, unsortedCompactSection is triggered when
compactWithoutSort is non-empty,
+ * regardless of the manifest count. This is because files in
compactWithoutSort either:
*
* <ul>
* <li>Are small files needing consolidation
@@ -810,7 +903,7 @@ public class ManifestFileSorter {
* acting as a conservative gate to avoid unnecessary rewrite when there
are no delete entries
* and the tail is too small.
*/
- private static void rewriteSubSegments(
+ private static void unsortedCompactSection(
List<ManifestFileMeta> section,
RewriteOutput output,
List<ManifestFileMeta> sortNewFiles,
@@ -820,47 +913,47 @@ public class ManifestFileSorter {
int suggestedMinMetaCount,
@Nullable Integer manifestReadParallelism)
throws Exception {
- List<ManifestFileMeta> subSegment = new ArrayList<>();
- long subSegmentSize = 0;
+ List<ManifestFileMeta> candidates = new ArrayList<>();
+ long candidatesSize = 0;
for (ManifestFileMeta m : section) {
- subSegmentSize += m.fileSize();
- subSegment.add(m);
+ candidatesSize += m.fileSize();
+ candidates.add(m);
- if (subSegmentSize >= suggestedMetaSize) {
- sortAndRewriteSection(
- subSegment,
+ if (candidatesSize >= suggestedMetaSize) {
+ rewriteSection(
+ candidates,
output,
sortNewFiles,
ctx,
manifestFile,
manifestReadParallelism);
- subSegment.clear();
- subSegmentSize = 0;
+ candidates.clear();
+ candidatesSize = 0;
}
}
// Flush tail only if delete entries exist or file count >= minCount.
- if (!subSegment.isEmpty()) {
- if (!ctx.deleteEntries.isEmpty() || subSegment.size() >=
suggestedMinMetaCount) {
- sortAndRewriteSection(
- subSegment,
+ if (!candidates.isEmpty()) {
+ if (!ctx.deleteEntries.isEmpty() || candidates.size() >=
suggestedMinMetaCount) {
+ rewriteSection(
+ candidates,
output,
sortNewFiles,
ctx,
manifestFile,
manifestReadParallelism);
} else {
- output.addAllUnchanged(subSegment);
+ output.addAllUnchanged(candidates);
}
}
}
/**
- * Sort and rewrite a section. Dispatches to full or minor compact path.
+ * Rewrite a section. Dispatches to full or minor compact path.
*
* <p>sortNewFiles is the same reference as newFilesForAbort, ensuring
newly written files are
* cleaned up on exception by the caller's catch block.
*/
- private static void sortAndRewriteSection(
+ private static void rewriteSection(
List<ManifestFileMeta> section,
RewriteOutput output,
List<ManifestFileMeta> sortNewFiles,
@@ -869,17 +962,15 @@ public class ManifestFileSorter {
@Nullable Integer manifestReadParallelism)
throws Exception {
// Skip rewrite for single file not in delete-range.
- if (section.size() == 1 &&
!ctx.defaultCompactionMap.getOrDefault(section.get(0), false)) {
+ if (section.size() == 1 &&
!ctx.compactWithoutSort.getOrDefault(section.get(0), false)) {
output.addUnchanged(section.get(0));
return;
}
if (ctx.fullCompaction) {
- sortAndRewriteFull(
- section, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
+ rewriteFull(section, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
} else {
- sortAndRewriteMinor(
- section, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
+ rewriteMinor(section, output, sortNewFiles, ctx, manifestFile,
manifestReadParallelism);
}
}
@@ -887,7 +978,7 @@ public class ManifestFileSorter {
* Full compaction path: read all surviving entries (ADD merged with
DELETE), sort them
* together, and write to output as a single sorted stream.
*/
- private static void sortAndRewriteFull(
+ private static void rewriteFull(
List<ManifestFileMeta> section,
RewriteOutput output,
List<ManifestFileMeta> sortNewFiles,
@@ -934,7 +1025,7 @@ public class ManifestFileSorter {
* entries into ADD and DELETE within each file, returning a Pair. Results
are merged in the
* main thread.
*/
- private static void sortAndRewriteMinor(
+ private static void rewriteMinor(
List<ManifestFileMeta> section,
RewriteOutput output,
List<ManifestFileMeta> sortNewFiles,
@@ -1128,12 +1219,12 @@ public class ManifestFileSorter {
static class Section {
final List<ManifestFileMeta> files;
final long totalSize;
- final boolean hasDefaultCompactMeta;
+ final boolean hasUnsortedCompactMeta;
- Section(List<ManifestFileMeta> files, long totalSize, boolean
hasDefaultCompactMeta) {
+ Section(List<ManifestFileMeta> files, long totalSize, boolean
hasUnsortedCompactMeta) {
this.files = files;
this.totalSize = totalSize;
- this.hasDefaultCompactMeta = hasDefaultCompactMeta;
+ this.hasUnsortedCompactMeta = hasUnsortedCompactMeta;
}
/** Create a merged section from two sections. */
@@ -1143,7 +1234,7 @@ public class ManifestFileSorter {
return new Section(
merged,
a.totalSize + b.totalSize,
- a.hasDefaultCompactMeta || b.hasDefaultCompactMeta);
+ a.hasUnsortedCompactMeta || b.hasUnsortedCompactMeta);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 75a1ab0a84..2b91824d9a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -964,7 +964,7 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
* Test that sort rewrite correctly eliminates DELETE entries and their
corresponding ADD
* entries. The key condition is that totalDeltaFileSize must reach
manifestFullCompactionSize
* to trigger the full compaction path inside trySortRewrite, which reads
deleteEntries and
- * passes them to sortAndRewriteSection for elimination.
+ * passes them to rewriteSection for elimination.
*
* <p>Design:
*