This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 21f6a44889 [core] Optimize calculate for 
DataEvolutionCompactCoordinator (#6867)
21f6a44889 is described below

commit 21f6a448895f0e2f2d723005f0e9586502797bfb
Author: YeJunHao <[email protected]>
AuthorDate: Tue Dec 23 17:29:55 2025 +0800

    [core] Optimize calculate for DataEvolutionCompactCoordinator (#6867)
---
 .../DataEvolutionCompactCoordinator.java           | 197 +++++++++++----------
 1 file changed, 108 insertions(+), 89 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index adc08a4bc8..8aef7b0a6f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.paimon.append.dataevolution;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.format.blob.BlobFileFormat;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -36,12 +35,15 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 
 /** Compact coordinator to compact data evolution table. */
 public class DataEvolutionCompactCoordinator {
@@ -107,12 +109,6 @@ public class DataEvolutionCompactCoordinator {
                         currentMetas.stream()
                                 .flatMap(meta -> 
snapshotReader.readManifest(meta).stream())
                                 .collect(Collectors.toList());
-                Comparator<ManifestEntry> comparator =
-                        Comparator.comparingLong((ManifestEntry a) -> 
a.file().nonNullFirstRowId())
-                                .thenComparingInt(
-                                        a -> 
BlobFileFormat.isBlobFile(a.fileName()) ? 1 : 0);
-                targetEntries.sort(comparator);
-
                 result.addAll(targetEntries);
             }
             return result;
@@ -126,14 +122,6 @@ public class DataEvolutionCompactCoordinator {
         private final long targetFileSize;
         private final long openFileCost;
         private final long compactMinFileNum;
-        private long lastRowIdStart = -1;
-        private long nextRowIdExpected = -1;
-        private long weightSum = 0L;
-        private BinaryRow lastPartition = null;
-        private boolean skipFile = false;
-        private List<DataEvolutionCompactTask> tasks = new ArrayList<>();
-        private List<DataFileMeta> groupFiles = new ArrayList<>();
-        private List<DataFileMeta> blobFiles = new ArrayList<>();
 
         CompactPlanner(
                 boolean compactBlob,
@@ -146,90 +134,121 @@ public class DataEvolutionCompactCoordinator {
             this.compactMinFileNum = compactMinFileNum;
         }
 
-        List<DataEvolutionCompactTask> compactPlan(Iterable<ManifestEntry> 
entries) {
-            for (ManifestEntry entry : entries) {
-                long rowId = entry.file().nonNullFirstRowId();
-                if (rowId < lastRowIdStart) {
-                    throw new IllegalStateException(
-                            "Files are not in order by rowId. Current file 
rowId: "
-                                    + rowId
-                                    + ", last file rowId: "
-                                    + lastRowIdStart);
-                } else if (rowId == lastRowIdStart) {
-                    checkArgument(
-                            lastPartition.equals(entry.partition()),
-                            "Inconsistent partition for the same rowId: " + 
rowId);
-                    if (!skipFile) {
-                        if (BlobFileFormat.isBlobFile(entry.fileName())) {
-                            blobFiles.add(entry.file());
+        List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
+            List<DataEvolutionCompactTask> tasks = new ArrayList<>();
+            Map<BinaryRow, List<DataFileMeta>> partitionedFiles = new 
LinkedHashMap<>();
+            for (ManifestEntry entry : input) {
+                partitionedFiles
+                        .computeIfAbsent(entry.partition(), k -> new 
ArrayList<>())
+                        .add(entry.file());
+            }
+
+            for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionFiles :
+                    partitionedFiles.entrySet()) {
+                BinaryRow partition = partitionFiles.getKey();
+                List<DataFileMeta> files = partitionFiles.getValue();
+                RangeHelper<DataFileMeta> rangeHelper =
+                        new RangeHelper<>(
+                                DataFileMeta::nonNullFirstRowId,
+                                // merge adjacent files
+                                f -> f.nonNullFirstRowId() + f.rowCount());
+
+                List<List<DataFileMeta>> ranges = 
rangeHelper.mergeOverlappingRanges(files);
+
+                for (List<DataFileMeta> group : ranges) {
+                    List<DataFileMeta> dataFiles = new ArrayList<>();
+                    List<DataFileMeta> blobFiles = new ArrayList<>();
+                    TreeMap<Long, DataFileMeta> treeMap = new TreeMap<>();
+                    Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles 
= new HashMap<>();
+                    for (DataFileMeta f : group) {
+                        if (!isBlobFile(f.fileName())) {
+                            treeMap.put(f.nonNullFirstRowId(), f);
+                            dataFiles.add(f);
                         } else {
-                            groupFiles.add(entry.file());
-                            weightSum += Math.max(entry.file().fileSize(), 
openFileCost);
+                            blobFiles.add(f);
                         }
                     }
-                } else if (rowId < nextRowIdExpected) {
-                    checkArgument(
-                            lastPartition.equals(entry.partition()),
-                            "Inconsistent partition for the same rowId: " + 
rowId);
-                    checkArgument(
-                            BlobFileFormat.isBlobFile(entry.fileName()),
-                            "Data file found in the middle of blob files for 
rowId: " + rowId);
-                    if (!skipFile) {
-                        blobFiles.add(entry.file());
-                    }
-                } else {
-                    BinaryRow currentPartition = entry.partition();
-                    long currentWeight = Math.max(entry.file().fileSize(), 
openFileCost);
-                    // skip big file
-                    skipFile = currentWeight > targetFileSize;
-
-                    // If compaction condition meets, do compaction
-                    if (weightSum > targetFileSize
-                            || rowId > nextRowIdExpected
-                            || !currentPartition.equals(lastPartition)
-                            || skipFile) {
-                        flushAll();
+
+                    if (compactBlob) {
+                        // associate blob files to data files
+                        for (DataFileMeta blobFile : blobFiles) {
+                            Long key = 
treeMap.floorKey(blobFile.nonNullFirstRowId());
+                            if (key != null) {
+                                DataFileMeta dataFile = treeMap.get(key);
+                                if (blobFile.nonNullFirstRowId() >= 
dataFile.nonNullFirstRowId()
+                                        && blobFile.nonNullFirstRowId()
+                                                <= dataFile.nonNullFirstRowId()
+                                                        + dataFile.rowCount()
+                                                        - 1) {
+                                    dataFileToBlobFiles
+                                            .computeIfAbsent(dataFile, k -> 
new ArrayList<>())
+                                            .add(blobFile);
+                                }
+                            }
+                        }
                     }
 
-                    if (!skipFile) {
-                        weightSum += currentWeight;
-                        groupFiles.add(entry.file());
+                    RangeHelper<DataFileMeta> rangeHelper2 =
+                            new RangeHelper<>(
+                                    DataFileMeta::nonNullFirstRowId,
+                                    // files group
+                                    f -> f.nonNullFirstRowId() + f.rowCount() 
- 1);
+                    List<List<DataFileMeta>> groupedFiles =
+                            rangeHelper2.mergeOverlappingRanges(dataFiles);
+                    List<DataFileMeta> waitCompactFiles = new ArrayList<>();
+
+                    long weightSum = 0L;
+                    for (List<DataFileMeta> fileGroup : groupedFiles) {
+                        long currentGroupWeight =
+                                fileGroup.stream()
+                                        .mapToLong(d -> Math.max(d.fileSize(), 
openFileCost))
+                                        .sum();
+                        if (currentGroupWeight > targetFileSize) {
+                            // compact current file group to merge field files
+                            tasks.addAll(triggerTask(fileGroup, partition, 
dataFileToBlobFiles));
+                            // compact wait compact files
+                            tasks.addAll(
+                                    triggerTask(waitCompactFiles, partition, 
dataFileToBlobFiles));
+                            waitCompactFiles = new ArrayList<>();
+                            weightSum = 0;
+                        } else {
+                            weightSum += currentGroupWeight;
+                            waitCompactFiles.addAll(fileGroup);
+                            if (weightSum > targetFileSize) {
+                                tasks.addAll(
+                                        triggerTask(
+                                                waitCompactFiles, partition, 
dataFileToBlobFiles));
+                                waitCompactFiles = new ArrayList<>();
+                                weightSum = 0L;
+                            }
+                        }
                     }
-                    lastRowIdStart = rowId;
-                    nextRowIdExpected = rowId + entry.file().rowCount();
-                    lastPartition = currentPartition;
+                    tasks.addAll(triggerTask(waitCompactFiles, partition, 
dataFileToBlobFiles));
                 }
             }
-            // do compaction for the last group
-            flushAll();
-
-            List<DataEvolutionCompactTask> result = new ArrayList<>(tasks);
-            tasks = new ArrayList<>();
-            return result;
+            return tasks;
         }
 
-        private void flushAll() {
-            if (!groupFiles.isEmpty()) {
-                if (groupFiles.size() >= compactMinFileNum) {
-                    tasks.add(
-                            new DataEvolutionCompactTask(
-                                    lastPartition, new 
ArrayList<>(groupFiles), false));
-
-                    if (compactBlob && blobFiles.size() > 1) {
-                        tasks.add(
-                                new DataEvolutionCompactTask(
-                                        lastPartition, new 
ArrayList<>(blobFiles), true));
-                    }
-                }
-
-                weightSum = 0L;
-                groupFiles = new ArrayList<>();
-                blobFiles = new ArrayList<>();
+        private List<DataEvolutionCompactTask> triggerTask(
+                List<DataFileMeta> dataFiles,
+                BinaryRow partition,
+                Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles) {
+            List<DataEvolutionCompactTask> tasks = new ArrayList<>();
+            if (dataFiles.size() >= compactMinFileNum) {
+                tasks.add(new DataEvolutionCompactTask(partition, dataFiles, 
false));
             }
 
-            checkArgument(weightSum == 0L, "Weight sum should be zero after 
compaction.");
-            checkArgument(groupFiles.isEmpty(), "Group files should be 
empty.");
-            checkArgument(blobFiles.isEmpty(), "Blob files should be empty.");
+            if (compactBlob) {
+                List<DataFileMeta> blobFiles = new ArrayList<>();
+                for (DataFileMeta dataFile : dataFiles) {
+                    blobFiles.addAll(
+                            dataFileToBlobFiles.getOrDefault(dataFile, 
Collections.emptyList()));
+                }
+                if (blobFiles.size() >= compactMinFileNum) {
+                    tasks.add(new DataEvolutionCompactTask(partition, 
blobFiles, true));
+                }
+            }
+            return tasks;
         }
     }
 }

Reply via email to