leaves12138 commented on code in PR #7842:
URL: https://github.com/apache/paimon/pull/7842#discussion_r3263390733


##########
paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileSorter.java:
##########
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.ManifestFileMerger.FullCompactionReadResult;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.Function;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+
+/** Manifest file sorter that sorts and rewrites manifest files by a 
configured partition field. */
+public class ManifestFileSorter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFileSorter.class);
+
+    /**
+     * Try to sort-rewrite the merged manifest list by a configured partition 
field. If the sort
+     * field cannot be resolved or the delta file size is below the full 
compaction threshold, the
+     * input is returned as-is.
+     */
+    static Optional<List<ManifestFileMeta>> trySortRewrite(
+            List<ManifestFileMeta> input,
+            List<ManifestFileMeta> newFilesForAbort,
+            ManifestFile manifestFile,
+            RowType partitionType,
+            CoreOptions options)
+            throws Exception {
+        // Extract configuration from options
+        long suggestedMetaSize = options.manifestTargetSize().getBytes();
+        Integer manifestReadParallelism = options.scanManifestParallelism();
+        String sortPartitionField = options.manifestSortPartitionField();
+        // Step 1: Resolve sort field.
+        String sortField = resolveSortField(sortPartitionField, partitionType);
+        if (sortField == null) {
+            throw new IllegalArgumentException(
+                    "Cannot resolve sort field for manifest sort rewrite.");
+        }
+        int sortFieldIndex = partitionType.getFieldNames().indexOf(sortField);
+        DataType sortFieldType = partitionType.getTypeAt(sortFieldIndex);
+
+        // Step 2: Classify manifests into defaultCompaction and LSM.
+        ClassifyResult classified =
+                classifyManifests(
+                        input,
+                        suggestedMetaSize,
+                        manifestFile,
+                        partitionType,
+                        manifestReadParallelism);
+        Map<ManifestFileMeta, boolean[]> defaultCompactionMap =
+                classified.defaultCompactionManifests;
+        List<ManifestFileMeta> lsmFiles = classified.lsmFiles;
+        Set<FileEntry.Identifier> deleteEntries = classified.deleteEntries;
+
+        // Step 3: Build LSM Tree and assign levels (only for lsmFiles).
+        List<ManifestSortedRun> levelRuns =
+                lsmFiles.isEmpty()
+                        ? new ArrayList<>()
+                        : buildLevelSortedRuns(lsmFiles, sortFieldIndex, 
sortFieldType);
+
+        // Step 4: Pick runs to compact.
+        ManifestPickStrategy pickStrategy =
+                new ManifestPickStrategy(
+                        options.maxSizeAmplificationPercent(), 
options.sortedRunSizeRatio());
+        List<ManifestSortedRun> pickedRuns = pickStrategy.pick(levelRuns);
+
+        if (pickedRuns.isEmpty() && defaultCompactionMap.isEmpty()) {
+            LOG.debug(
+                    "Manifest sort rewrite skipped: no runs picked and no 
defaultCompaction files.");
+            return Optional.empty();
+        }
+
+        LOG.info(
+                "Manifest sort rewrite: input={} files, lsm={} runs, picked={} 
runs, "
+                        + "defaultCompaction={} files.",
+                input.size(),
+                levelRuns.size(),
+                pickedRuns.size(),
+                defaultCompactionMap.size());
+
+        Set<ManifestSortedRun> pickedSet = new HashSet<>(pickedRuns);
+        List<ManifestFileMeta> reusedFiles = new ArrayList<>();
+        for (ManifestSortedRun run : levelRuns) {
+            if (!pickedSet.contains(run)) {
+                reusedFiles.addAll(run.files());
+            }
+        }
+        List<ManifestFileMeta> result = new ArrayList<>(reusedFiles);
+
+        // Step 5: Split picked files into sections, sort and rewrite each.
+        List<ManifestFileMeta> pickedFiles = new ArrayList<>();
+        for (ManifestSortedRun run : pickedRuns) {
+            pickedFiles.addAll(run.files());
+        }
+        pickedFiles.addAll(defaultCompactionMap.keySet());
+
+        List<Section> sections =
+                splitIntoSections(pickedFiles, sortFieldIndex, sortFieldType, 
defaultCompactionMap);
+        sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
+
+        rewriteSections(
+                sections,
+                defaultCompactionMap,
+                manifestFile,
+                sortFieldIndex,
+                sortFieldType,
+                deleteEntries,
+                suggestedMetaSize,
+                options.manifestSortMaxRewriteSize(),
+                result,
+                newFilesForAbort,
+                manifestReadParallelism);
+
+        LOG.info(
+                "Manifest sort rewrite completed: sections={}, newFiles={}, 
resultFiles={}.",
+                sections.size(),
+                newFilesForAbort.size(),
+                result.size());
+        return Optional.of(result);
+    }
+
+    /**
+     * Classify manifest files into default-compaction group and LSM group.
+     *
+     * <p>When full compaction is triggered (totalDeltaFileSize >= threshold), 
files that must
+     * change or overlap with delete partitions go into 
defaultCompactionManifests; the rest stay as
+     * lsmFiles.
+     *
+     * <p>When full compaction is NOT triggered, adjacent small manifests 
whose cumulative size
+     * reaches suggestedMetaSize are grouped into defaultCompactionManifests 
(minor-style pick).
+     */
+    private static ClassifyResult classifyManifests(
+            List<ManifestFileMeta> input,
+            long suggestedMetaSize,
+            ManifestFile manifestFile,
+            RowType partitionType,
+            @Nullable Integer manifestReadParallelism) {
+        Map<ManifestFileMeta, boolean[]> defaultCompactionManifests = new 
LinkedHashMap<>();
+        List<ManifestFileMeta> lsmFiles = new LinkedList<>(input);
+        Set<FileEntry.Identifier> deleteEntries =
+                FileEntry.readDeletedEntries(manifestFile, input, 
manifestReadParallelism);
+
+        PartitionPredicate predicate;
+        if (deleteEntries.isEmpty()) {
+            predicate = PartitionPredicate.ALWAYS_FALSE;
+        } else {
+            if (partitionType.getFieldCount() > 0) {
+                Set<BinaryRow> deletePartitions =
+                        
ManifestFileMerger.computeDeletePartitions(deleteEntries);
+                predicate = PartitionPredicate.fromMultiple(partitionType, 
deletePartitions);
+            } else {
+                predicate = PartitionPredicate.ALWAYS_TRUE;
+            }
+        }
+
+        Iterator<ManifestFileMeta> iterator = lsmFiles.iterator();
+        while (iterator.hasNext()) {
+            ManifestFileMeta file = iterator.next();
+            boolean small = file.fileSize() < suggestedMetaSize;
+            boolean inDeleteRange =
+                    predicate != null
+                            && predicate.test(
+                                    file.numAddedFiles() + 
file.numDeletedFiles(),
+                                    file.partitionStats().minValues(),
+                                    file.partitionStats().maxValues(),
+                                    file.partitionStats().nullCounts());
+            if (small || inDeleteRange) {
+                iterator.remove();
+                defaultCompactionManifests.put(file, new boolean[] {small, 
inDeleteRange});
+            }
+        }
+        return new ClassifyResult(defaultCompactionManifests, lsmFiles, 
deleteEntries);
+    }
+
+    /**
+     * Build level-sorted runs from a list of manifest files. Sorts files by 
min partition value,
+     * greedy-scans to build non-overlapping SortedRuns, then assigns levels 
by totalSize (Top-4
+     * largest to level 1~4, rest to level 0).
+     */
+    static List<ManifestSortedRun> buildLevelSortedRuns(
+            List<ManifestFileMeta> input, int sortFieldIndex, DataType 
sortFieldType) {
+        // Step 1: Sort by min value (if equal, then by max value)
+        input.sort(
+                (a, b) -> {
+                    int cmp =
+                            compareField(
+                                    a.partitionStats().minValues(),
+                                    b.partitionStats().minValues(),
+                                    sortFieldIndex,
+                                    sortFieldType);
+                    if (cmp != 0) {
+                        return cmp;
+                    }
+                    return compareField(
+                            a.partitionStats().maxValues(),
+                            b.partitionStats().maxValues(),
+                            sortFieldIndex,
+                            sortFieldType);
+                });
+
+        // Step 2: Interval graph coloring algorithm - assign files to runs
+        // Use priority queue to track runs by their max values
+        PriorityQueue<List<ManifestFileMeta>> runs =
+                new PriorityQueue<>(
+                        (r1, r2) -> {
+                            ManifestFileMeta last1 = r1.get(r1.size() - 1);
+                            ManifestFileMeta last2 = r2.get(r2.size() - 1);
+                            return compareField(
+                                    last1.partitionStats().maxValues(),
+                                    last2.partitionStats().maxValues(),
+                                    sortFieldIndex,
+                                    sortFieldType);
+                        });
+
+        for (ManifestFileMeta file : input) {
+            boolean addedToExisting = false;
+
+            // Try to find a run where current file's min >= run's max
+            if (!runs.isEmpty()) {
+                List<ManifestFileMeta> earliestRun = runs.peek();
+                ManifestFileMeta last = earliestRun.get(earliestRun.size() - 
1);
+
+                if (compareField(
+                                file.partitionStats().minValues(),
+                                last.partitionStats().maxValues(),
+                                sortFieldIndex,
+                                sortFieldType)
+                        >= 0) {
+                    // Current file can be added to this run
+                    runs.poll();
+                    earliestRun.add(file);
+                    runs.offer(earliestRun);
+                    addedToExisting = true;
+                }
+            }
+
+            if (!addedToExisting) {

Review Comment:
   Do not use boolean addedToExisting.
   
   Just
   List<ManifestFileMeta> earliestRun = runs.pool();
   if (earliestRun == null) {
       do something
   } else if (compare(xxx) > 0) {
       do something
   } else {
       do something
   }
   
   It makes this more pretty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to