discivigour commented on code in PR #7842: URL: https://github.com/apache/paimon/pull/7842#discussion_r3264555497
########## paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java: ########## @@ -0,0 +1,785 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.operation.ManifestFileMerger.FullCompactionReadResult; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Collections.singletonList; +import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; + +/** Manifest file sorter that sorts and rewrites manifest files by a configured partition field. */ +public class ManifestFileSorter { + + private static final Logger LOG = LoggerFactory.getLogger(ManifestFileSorter.class); + + /** + * Try to sort-rewrite the merged manifest list by a configured partition field. If the sort + * field cannot be resolved or the delta file size is below the full compaction threshold, the + * input is returned as-is. + */ + static Optional<List<ManifestFileMeta>> trySortRewrite( + List<ManifestFileMeta> input, + List<ManifestFileMeta> newFilesForAbort, + ManifestFile manifestFile, + RowType partitionType, + CoreOptions options) + throws Exception { + // Extract configuration from options + long suggestedMetaSize = options.manifestTargetSize().getBytes(); + Integer manifestReadParallelism = options.scanManifestParallelism(); + String sortPartitionField = options.manifestSortPartitionField(); + // Step 1: Resolve sort field. + String sortField = resolveSortField(sortPartitionField, partitionType); + if (sortField == null) { + throw new IllegalArgumentException( + "Cannot resolve sort field for manifest sort rewrite."); + } + int sortFieldIndex = partitionType.getFieldNames().indexOf(sortField); + DataType sortFieldType = partitionType.getTypeAt(sortFieldIndex); + + // Step 2: Classify manifests into defaultCompaction and LSM. + ClassifyResult classified = + classifyManifests( + input, + suggestedMetaSize, + manifestFile, + partitionType, + manifestReadParallelism); + Map<ManifestFileMeta, boolean[]> defaultCompactionMap = + classified.defaultCompactionManifests; + List<ManifestFileMeta> lsmFiles = classified.lsmFiles; + Set<FileEntry.Identifier> deleteEntries = classified.deleteEntries; + + // Step 3: Build LSM Tree and assign levels (only for lsmFiles). + List<ManifestSortedRun> levelRuns = + lsmFiles.isEmpty() + ? new ArrayList<>() + : buildLevelSortedRuns(lsmFiles, sortFieldIndex, sortFieldType); + + // Step 4: Pick runs to compact. + ManifestPickStrategy pickStrategy = + new ManifestPickStrategy( + options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio()); + List<ManifestSortedRun> pickedRuns = pickStrategy.pick(levelRuns); + + if (pickedRuns.isEmpty() && defaultCompactionMap.isEmpty()) { + LOG.debug( + "Manifest sort rewrite skipped: no runs picked and no defaultCompaction files."); + return Optional.empty(); + } + + LOG.info( + "Manifest sort rewrite: input={} files, lsm={} runs, picked={} runs, " + + "defaultCompaction={} files.", + input.size(), + levelRuns.size(), + pickedRuns.size(), + defaultCompactionMap.size()); + + Set<ManifestSortedRun> pickedSet = new HashSet<>(pickedRuns); + List<ManifestFileMeta> reusedFiles = new ArrayList<>(); + for (ManifestSortedRun run : levelRuns) { + if (!pickedSet.contains(run)) { + reusedFiles.addAll(run.files()); + } + } + List<ManifestFileMeta> result = new ArrayList<>(reusedFiles); + + // Step 5: Split picked files into sections, sort and rewrite each. + List<ManifestFileMeta> pickedFiles = new ArrayList<>(); + for (ManifestSortedRun run : pickedRuns) { + pickedFiles.addAll(run.files()); + } + pickedFiles.addAll(defaultCompactionMap.keySet()); + + List<Section> sections = + splitIntoSections(pickedFiles, sortFieldIndex, sortFieldType, defaultCompactionMap); + sections = mergeSmallAdjacentSections(sections, suggestedMetaSize); + + rewriteSections( + sections, + defaultCompactionMap, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + suggestedMetaSize, + options.manifestSortMaxRewriteSize(), + result, + newFilesForAbort, + manifestReadParallelism); + + LOG.info( + "Manifest sort rewrite completed: sections={}, newFiles={}, resultFiles={}.", + sections.size(), + newFilesForAbort.size(), + result.size()); + return Optional.of(result); + } + + /** + * Classify manifest files into default-compaction group and LSM group. + * + * <p>When full compaction is triggered (totalDeltaFileSize >= threshold), files that must + * change or overlap with delete partitions go into defaultCompactionManifests; the rest stay as + * lsmFiles. + * + * <p>When full compaction is NOT triggered, adjacent small manifests whose cumulative size + * reaches suggestedMetaSize are grouped into defaultCompactionManifests (minor-style pick). + */ + private static ClassifyResult classifyManifests( + List<ManifestFileMeta> input, + long suggestedMetaSize, + ManifestFile manifestFile, + RowType partitionType, + @Nullable Integer manifestReadParallelism) { + Map<ManifestFileMeta, boolean[]> defaultCompactionManifests = new LinkedHashMap<>(); + List<ManifestFileMeta> lsmFiles = new LinkedList<>(input); + Set<FileEntry.Identifier> deleteEntries = + FileEntry.readDeletedEntries(manifestFile, input, manifestReadParallelism); + + PartitionPredicate predicate; + if (deleteEntries.isEmpty()) { + predicate = PartitionPredicate.ALWAYS_FALSE; + } else { + if (partitionType.getFieldCount() > 0) { + Set<BinaryRow> deletePartitions = + ManifestFileMerger.computeDeletePartitions(deleteEntries); + predicate = PartitionPredicate.fromMultiple(partitionType, deletePartitions); + } else { + predicate = PartitionPredicate.ALWAYS_TRUE; + } + } + + Iterator<ManifestFileMeta> iterator = lsmFiles.iterator(); + while (iterator.hasNext()) { + ManifestFileMeta file = iterator.next(); + boolean small = file.fileSize() < suggestedMetaSize; + boolean inDeleteRange = + predicate != null + && predicate.test( + file.numAddedFiles() + file.numDeletedFiles(), + file.partitionStats().minValues(), + file.partitionStats().maxValues(), + file.partitionStats().nullCounts()); + if (small || inDeleteRange) { + iterator.remove(); + defaultCompactionManifests.put(file, new boolean[] {small, inDeleteRange}); + } + } + return new ClassifyResult(defaultCompactionManifests, lsmFiles, deleteEntries); + } + + /** + * Build level-sorted runs from a list of manifest files. Sorts files by min partition value, + * greedy-scans to build non-overlapping SortedRuns, then assigns levels by totalSize (Top-4 + * largest to level 1~4, rest to level 0). + */ + static List<ManifestSortedRun> buildLevelSortedRuns( + List<ManifestFileMeta> input, int sortFieldIndex, DataType sortFieldType) { + // Step 1: Sort by min value (if equal, then by max value) + input.sort( + (a, b) -> { + int cmp = + compareField( + a.partitionStats().minValues(), + b.partitionStats().minValues(), + sortFieldIndex, + sortFieldType); + if (cmp != 0) { + return cmp; + } + return compareField( + a.partitionStats().maxValues(), + b.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType); + }); + + // Step 2: Interval graph coloring algorithm - assign files to runs + // Use priority queue to track runs by their max values + PriorityQueue<List<ManifestFileMeta>> runs = + new PriorityQueue<>( + (r1, r2) -> { + ManifestFileMeta last1 = r1.get(r1.size() - 1); + ManifestFileMeta last2 = r2.get(r2.size() - 1); + return compareField( + last1.partitionStats().maxValues(), + last2.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType); + }); + + for (ManifestFileMeta file : input) { + boolean addedToExisting = false; + + // Try to find a run where current file's min >= run's max + if (!runs.isEmpty()) { + List<ManifestFileMeta> earliestRun = runs.peek(); + ManifestFileMeta last = earliestRun.get(earliestRun.size() - 1); + + if (compareField( + file.partitionStats().minValues(), + last.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType) + >= 0) { + // Current file can be added to this run + runs.poll(); + earliestRun.add(file); + runs.offer(earliestRun); + addedToExisting = true; + } + } + + if (!addedToExisting) { + // Create a new run + List<ManifestFileMeta> newRun = new ArrayList<>(); + newRun.add(file); + runs.offer(newRun); + } + } + + // Step 3: Convert to ManifestSortedRun list + List<ManifestSortedRun> result = new ArrayList<>(); + while (!runs.isEmpty()) { + result.add(ManifestSortedRun.fromSorted(runs.poll())); + } + + // Step 4: Sort by totalSize and assign levels + result.sort(Comparator.comparingLong(ManifestSortedRun::totalSize)); + int n = result.size(); + int maxLevel = 4; + for (int i = 0; i < n; i++) { + if (i >= n - maxLevel) { + result.get(i).setLevel(i - (n - maxLevel) + 1); + } else { + result.get(i).setLevel(0); + } + } + return result; + } + + /** + * Split picked files into sections. Files with overlapping sort-key intervals go into the same + * section. Each section is built with pre-computed totalSize and hasDefaultCompactMeta. + */ + static List<Section> splitIntoSections( + List<ManifestFileMeta> pickedFiles, + int sortFieldIndex, + DataType sortFieldType, + Map<ManifestFileMeta, boolean[]> defaultCompactionMap) { + pickedFiles.sort( + (a, b) -> { + int cmp = + compareField( + a.partitionStats().minValues(), + b.partitionStats().minValues(), + sortFieldIndex, + sortFieldType); + if (cmp != 0) { + return cmp; + } + return compareField( + a.partitionStats().maxValues(), + b.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType); + }); + + List<Section> sections = new ArrayList<>(); + List<ManifestFileMeta> currentFiles = new ArrayList<>(); + long currentTotalSize = 0; + boolean currentHasDefault = false; + ManifestFileMeta first = pickedFiles.get(0); + currentFiles.add(first); + currentTotalSize += first.fileSize(); + currentHasDefault = defaultCompactionMap.containsKey(first); + BinaryRow sectionMaxBound = first.partitionStats().maxValues(); + + for (int i = 1; i < pickedFiles.size(); i++) { + ManifestFileMeta file = pickedFiles.get(i); + if (compareField( + file.partitionStats().minValues(), + sectionMaxBound, + sortFieldIndex, + sortFieldType) + >= 0) { + sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault)); + currentFiles = new ArrayList<>(); + currentTotalSize = 0; + currentFiles.add(file); + currentTotalSize += file.fileSize(); + currentHasDefault = defaultCompactionMap.containsKey(file); + sectionMaxBound = file.partitionStats().maxValues(); + } else { + currentFiles.add(file); + currentTotalSize += file.fileSize(); + if (!currentHasDefault && defaultCompactionMap.containsKey(file)) { + currentHasDefault = true; + } + if (compareField( + file.partitionStats().maxValues(), + sectionMaxBound, + sortFieldIndex, + sortFieldType) + > 0) { + sectionMaxBound = file.partitionStats().maxValues(); + } + } + } + sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault)); + return sections; + } + + /** + * Merge small adjacent sections to avoid producing too many small rewrite batches. If either + * the pending section or the current section total size is smaller than half of {@code + * suggestedMetaSize}, they are combined into a single section. + */ + private static List<Section> mergeSmallAdjacentSections( + List<Section> sections, long suggestedMetaSize) { + List<Section> merged = new ArrayList<>(); + Section pending = null; + + for (Section section : sections) { + if (pending == null) { + pending = section; + } else { + if (pending.totalSize < suggestedMetaSize + || section.totalSize < suggestedMetaSize) { + pending = Section.merge(pending, section); + } else { + merged.add(pending); + pending = section; + } + } + } + if (pending != null) { + merged.add(pending); + } + return merged; + } + + /** + * Iterate over sections, decide whether to rewrite each section fully or partially based on the + * maxRewriteSize threshold and whether the section contains defaultCompaction files. + * + * <p>Within threshold: read all metas, sort and rewrite the entire section. Exceeds threshold + * but contains defaultCompaction files: only rewrite sub-segments around those files. Exceeds + * threshold with no defaultCompaction files: skip (keep as-is). + */ + private static void rewriteSections( + List<Section> sections, + Map<ManifestFileMeta, boolean[]> defaultCompactionMap, + ManifestFile manifestFile, + int sortFieldIndex, + DataType sortFieldType, + Set<FileEntry.Identifier> deleteEntries, + long suggestedMetaSize, + long maxRewriteSize, + List<ManifestFileMeta> result, + List<ManifestFileMeta> sortNewFiles, + @Nullable Integer manifestReadParallelism) + throws Exception { + long processedSize = 0; + boolean reachedLimit = false; + + for (int i = 0; i < sections.size(); i++) { + Section section = sections.get(i); + if (section.files.size() == 1) { + sortAndRewriteSection( + section.files, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + defaultCompactionMap, + result, + sortNewFiles, + manifestReadParallelism); + continue; + } + + if (processedSize + section.totalSize <= maxRewriteSize) { + processedSize += section.totalSize; + sortAndRewriteSection( + section.files, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + defaultCompactionMap, + result, + sortNewFiles, + manifestReadParallelism); + } else if (!reachedLimit) { + // First time exceeding threshold without defaultCompaction: + // partial rewrite within remaining budget. + long rewriteTotalSize = maxRewriteSize - processedSize; + processedSize += section.totalSize; + // Split section into two parts: files within budget and remaining files + List<ManifestFileMeta> rewriteFiles = new ArrayList<>(); + List<ManifestFileMeta> remainingFiles = new ArrayList<>(); + long rewriteSize = 0; + long remainingSize = 0; + boolean remainingHasDefault = false; + + for (ManifestFileMeta file : section.files) { + if (rewriteSize + file.fileSize() <= rewriteTotalSize) { + rewriteFiles.add(file); + rewriteSize += file.fileSize(); + } else { + remainingFiles.add(file); + remainingSize += file.fileSize(); + if (defaultCompactionMap.containsKey(file)) { + remainingHasDefault = true; + } + } + } + + sortAndRewriteSection( + rewriteFiles, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + defaultCompactionMap, + result, + sortNewFiles, + manifestReadParallelism); + + // Create new section for remaining files and append to sections list + if (!remainingFiles.isEmpty()) { + Section remainingSection = + new Section(remainingFiles, remainingSize, remainingHasDefault); + // Append remaining section to the end of sections list + sections.add(remainingSection); + } + reachedLimit = true; + } else if (section.hasDefaultCompactMeta) { + rewriteSubSegments( + section.files, + defaultCompactionMap, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + suggestedMetaSize, + result, + sortNewFiles, + manifestReadParallelism); + } else { + result.addAll(section.files); + } + } + } + + /** Rewrite sub-segments within a section that exceeds the rewrite threshold. */ + private static void rewriteSubSegments( + List<ManifestFileMeta> section, + Map<ManifestFileMeta, boolean[]> defaultCompactionMap, + ManifestFile manifestFile, + int sortFieldIndex, + DataType sortFieldType, + @Nullable Set<FileEntry.Identifier> deleteEntries, + long manifestTargetSize, + List<ManifestFileMeta> result, + List<ManifestFileMeta> sortNewFiles, + @Nullable Integer manifestReadParallelism) + throws Exception { + List<ManifestFileMeta> subSegment = new ArrayList<>(); + long subSegmentSize = 0; + for (ManifestFileMeta m : section) { + subSegmentSize += m.fileSize(); + subSegment.add(m); + + if (subSegmentSize >= manifestTargetSize) { + sortAndRewriteSection( + subSegment, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + defaultCompactionMap, + result, + sortNewFiles, + manifestReadParallelism); + subSegment.clear(); + subSegmentSize = 0; + } + } + // Flush remaining sub-segment + if (!subSegment.isEmpty()) { + sortAndRewriteSection( + subSegment, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + defaultCompactionMap, + result, + sortNewFiles, + manifestReadParallelism); + } + } + + /** + * Read all entries from a section's manifest files, sort them in memory by the specified + * partition field, filter out DELETE entries and cancelled ADD entries, then write surviving + * entries to new manifest files via the rolling writer. + * + * <p>All files participate in sorting, enabling full sort across the entire section. + * + * <p>Reading is parallelized via {@code sequentialBatchedExecute} following the same pattern as + * {@link ManifestFileMerger#tryFullCompaction}. + */ + private static void sortAndRewriteSection( + List<ManifestFileMeta> section, + ManifestFile manifestFile, + int sortFieldIndex, + DataType sortFieldType, + Set<FileEntry.Identifier> deletedIdentifiers, + Map<ManifestFileMeta, boolean[]> defaultCompactionMap, + List<ManifestFileMeta> result, + List<ManifestFileMeta> sortNewFiles, + @Nullable Integer manifestReadParallelism) + throws Exception { + if (section.size() == 1 + && (!defaultCompactionMap.containsKey(section.get(0)) + || !defaultCompactionMap.get(section.get(0))[1])) { + result.add(section.get(0)); + return; + } + // Parallel read: each meta is read independently + Function<ManifestFileMeta, List<FullCompactionReadResult>> reader = + meta -> singletonList(readForSortRewrite(meta, manifestFile, deletedIdentifiers)); + + List<ManifestEntry> entriesToRewrite = new ArrayList<>(); + for (FullCompactionReadResult readResult : + sequentialBatchedExecute(reader, section, manifestReadParallelism)) { + entriesToRewrite.addAll(readResult.entries); + } + + if (!entriesToRewrite.isEmpty()) { + entriesToRewrite.sort((a, b) -> compareSortKey(a, b, sortFieldIndex, sortFieldType)); + + RollingFileWriter<ManifestEntry, ManifestFileMeta> writer = + manifestFile.createRollingWriter(); + Exception exception = null; + try { + writer.write(entriesToRewrite); + } catch (Exception e) { + exception = e; + } finally { + if (exception != null) { + writer.abort(); + throw exception; + } + writer.close(); + } + List<ManifestFileMeta> sorted = writer.result(); + result.addAll(sorted); + sortNewFiles.addAll(sorted); + } + } + + /** + * Compare two {@link ManifestEntry}s by the composite key {@code (sort-field, fileName)}. + * {@code fileName} is used as the tie-breaker so that all entries sharing the same sort-field + * value AND the same data file are emitted contiguously. + */ + static int compareSortKey( + ManifestEntry a, ManifestEntry b, int sortFieldIndex, DataType sortFieldType) { + int c = compareField(a.partition(), b.partition(), sortFieldIndex, sortFieldType); + if (c != 0) { + return c; + } + // ADD before DELETE + int kindCmp = a.kind().compareTo(b.kind()); + if (kindCmp != 0) { + return kindCmp; + } + return a.file().fileName().compareTo(b.file().fileName()); + } + + /** + * Compares the value at field {@code k} of two {@link BinaryRow}s according to {@code type}. + */ + static int compareField(BinaryRow a, BinaryRow b, int k, DataType type) { Review Comment: Nice point. ########## paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java: ########## @@ -0,0 +1,785 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.operation.ManifestFileMerger.FullCompactionReadResult; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Collections.singletonList; +import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; + +/** Manifest file sorter that sorts and rewrites manifest files by a configured partition field. */ +public class ManifestFileSorter { + + private static final Logger LOG = LoggerFactory.getLogger(ManifestFileSorter.class); + + /** + * Try to sort-rewrite the merged manifest list by a configured partition field. If the sort + * field cannot be resolved or the delta file size is below the full compaction threshold, the + * input is returned as-is. + */ + static Optional<List<ManifestFileMeta>> trySortRewrite( + List<ManifestFileMeta> input, + List<ManifestFileMeta> newFilesForAbort, + ManifestFile manifestFile, + RowType partitionType, + CoreOptions options) + throws Exception { + // Extract configuration from options + long suggestedMetaSize = options.manifestTargetSize().getBytes(); + Integer manifestReadParallelism = options.scanManifestParallelism(); + String sortPartitionField = options.manifestSortPartitionField(); + // Step 1: Resolve sort field. + String sortField = resolveSortField(sortPartitionField, partitionType); + if (sortField == null) { + throw new IllegalArgumentException( + "Cannot resolve sort field for manifest sort rewrite."); + } + int sortFieldIndex = partitionType.getFieldNames().indexOf(sortField); + DataType sortFieldType = partitionType.getTypeAt(sortFieldIndex); + + // Step 2: Classify manifests into defaultCompaction and LSM. + ClassifyResult classified = + classifyManifests( + input, + suggestedMetaSize, + manifestFile, + partitionType, + manifestReadParallelism); + Map<ManifestFileMeta, boolean[]> defaultCompactionMap = + classified.defaultCompactionManifests; + List<ManifestFileMeta> lsmFiles = classified.lsmFiles; + Set<FileEntry.Identifier> deleteEntries = classified.deleteEntries; + + // Step 3: Build LSM Tree and assign levels (only for lsmFiles). + List<ManifestSortedRun> levelRuns = + lsmFiles.isEmpty() + ? new ArrayList<>() + : buildLevelSortedRuns(lsmFiles, sortFieldIndex, sortFieldType); + + // Step 4: Pick runs to compact. + ManifestPickStrategy pickStrategy = + new ManifestPickStrategy( + options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio()); + List<ManifestSortedRun> pickedRuns = pickStrategy.pick(levelRuns); + + if (pickedRuns.isEmpty() && defaultCompactionMap.isEmpty()) { + LOG.debug( + "Manifest sort rewrite skipped: no runs picked and no defaultCompaction files."); + return Optional.empty(); + } + + LOG.info( + "Manifest sort rewrite: input={} files, lsm={} runs, picked={} runs, " + + "defaultCompaction={} files.", + input.size(), + levelRuns.size(), + pickedRuns.size(), + defaultCompactionMap.size()); + + Set<ManifestSortedRun> pickedSet = new HashSet<>(pickedRuns); + List<ManifestFileMeta> reusedFiles = new ArrayList<>(); + for (ManifestSortedRun run : levelRuns) { + if (!pickedSet.contains(run)) { + reusedFiles.addAll(run.files()); + } + } + List<ManifestFileMeta> result = new ArrayList<>(reusedFiles); + + // Step 5: Split picked files into sections, sort and rewrite each. + List<ManifestFileMeta> pickedFiles = new ArrayList<>(); + for (ManifestSortedRun run : pickedRuns) { + pickedFiles.addAll(run.files()); + } + pickedFiles.addAll(defaultCompactionMap.keySet()); + + List<Section> sections = + splitIntoSections(pickedFiles, sortFieldIndex, sortFieldType, defaultCompactionMap); + sections = mergeSmallAdjacentSections(sections, suggestedMetaSize); + + rewriteSections( + sections, + defaultCompactionMap, + manifestFile, + sortFieldIndex, + sortFieldType, + deleteEntries, + suggestedMetaSize, + options.manifestSortMaxRewriteSize(), + result, + newFilesForAbort, + manifestReadParallelism); + + LOG.info( + "Manifest sort rewrite completed: sections={}, newFiles={}, resultFiles={}.", + sections.size(), + newFilesForAbort.size(), + result.size()); + return Optional.of(result); + } + + /** + * Classify manifest files into default-compaction group and LSM group. + * + * <p>When full compaction is triggered (totalDeltaFileSize >= threshold), files that must + * change or overlap with delete partitions go into defaultCompactionManifests; the rest stay as + * lsmFiles. + * + * <p>When full compaction is NOT triggered, adjacent small manifests whose cumulative size + * reaches suggestedMetaSize are grouped into defaultCompactionManifests (minor-style pick). + */ + private static ClassifyResult classifyManifests( + List<ManifestFileMeta> input, + long suggestedMetaSize, + ManifestFile manifestFile, + RowType partitionType, + @Nullable Integer manifestReadParallelism) { + Map<ManifestFileMeta, boolean[]> defaultCompactionManifests = new LinkedHashMap<>(); + List<ManifestFileMeta> lsmFiles = new LinkedList<>(input); + Set<FileEntry.Identifier> deleteEntries = + FileEntry.readDeletedEntries(manifestFile, input, manifestReadParallelism); + + PartitionPredicate predicate; + if (deleteEntries.isEmpty()) { + predicate = PartitionPredicate.ALWAYS_FALSE; + } else { + if (partitionType.getFieldCount() > 0) { + Set<BinaryRow> deletePartitions = + ManifestFileMerger.computeDeletePartitions(deleteEntries); + predicate = PartitionPredicate.fromMultiple(partitionType, deletePartitions); + } else { + predicate = PartitionPredicate.ALWAYS_TRUE; + } + } + + Iterator<ManifestFileMeta> iterator = lsmFiles.iterator(); + while (iterator.hasNext()) { + ManifestFileMeta file = iterator.next(); + boolean small = file.fileSize() < suggestedMetaSize; + boolean inDeleteRange = + predicate != null + && predicate.test( + file.numAddedFiles() + file.numDeletedFiles(), + file.partitionStats().minValues(), + file.partitionStats().maxValues(), + file.partitionStats().nullCounts()); + if (small || inDeleteRange) { + iterator.remove(); + defaultCompactionManifests.put(file, new boolean[] {small, inDeleteRange}); + } + } + return new ClassifyResult(defaultCompactionManifests, lsmFiles, deleteEntries); + } + + /** + * Build level-sorted runs from a list of manifest files. Sorts files by min partition value, + * greedy-scans to build non-overlapping SortedRuns, then assigns levels by totalSize (Top-4 + * largest to level 1~4, rest to level 0). + */ + static List<ManifestSortedRun> buildLevelSortedRuns( + List<ManifestFileMeta> input, int sortFieldIndex, DataType sortFieldType) { + // Step 1: Sort by min value (if equal, then by max value) + input.sort( + (a, b) -> { + int cmp = + compareField( + a.partitionStats().minValues(), + b.partitionStats().minValues(), + sortFieldIndex, + sortFieldType); + if (cmp != 0) { + return cmp; + } + return compareField( + a.partitionStats().maxValues(), + b.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType); + }); + + // Step 2: Interval graph coloring algorithm - assign files to runs + // Use priority queue to track runs by their max values + PriorityQueue<List<ManifestFileMeta>> runs = + new PriorityQueue<>( + (r1, r2) -> { + ManifestFileMeta last1 = r1.get(r1.size() - 1); + ManifestFileMeta last2 = r2.get(r2.size() - 1); + return compareField( + last1.partitionStats().maxValues(), + last2.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType); + }); + + for (ManifestFileMeta file : input) { + boolean addedToExisting = false; + + // Try to find a run where current file's min >= run's max + if (!runs.isEmpty()) { + List<ManifestFileMeta> earliestRun = runs.peek(); + ManifestFileMeta last = earliestRun.get(earliestRun.size() - 1); + + if (compareField( + file.partitionStats().minValues(), + last.partitionStats().maxValues(), + sortFieldIndex, + sortFieldType) + >= 0) { + // Current file can be added to this run + runs.poll(); + earliestRun.add(file); + runs.offer(earliestRun); + addedToExisting = true; + } + } + + if (!addedToExisting) { Review Comment: changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
