This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 21f6a44889 [core] Optimize calculate for
DataEvolutionCompactCoordinator (#6867)
21f6a44889 is described below
commit 21f6a448895f0e2f2d723005f0e9586502797bfb
Author: YeJunHao <[email protected]>
AuthorDate: Tue Dec 23 17:29:55 2025 +0800
[core] Optimize calculate for DataEvolutionCompactCoordinator (#6867)
---
.../DataEvolutionCompactCoordinator.java | 197 +++++++++++----------
1 file changed, 108 insertions(+), 89 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index adc08a4bc8..8aef7b0a6f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.paimon.append.dataevolution;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.format.blob.BlobFileFormat;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -36,12 +35,15 @@ import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import java.util.TreeMap;
import java.util.stream.Collectors;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
/** Compact coordinator to compact data evolution table. */
public class DataEvolutionCompactCoordinator {
@@ -107,12 +109,6 @@ public class DataEvolutionCompactCoordinator {
currentMetas.stream()
.flatMap(meta ->
snapshotReader.readManifest(meta).stream())
.collect(Collectors.toList());
- Comparator<ManifestEntry> comparator =
- Comparator.comparingLong((ManifestEntry a) ->
a.file().nonNullFirstRowId())
- .thenComparingInt(
- a ->
BlobFileFormat.isBlobFile(a.fileName()) ? 1 : 0);
- targetEntries.sort(comparator);
-
result.addAll(targetEntries);
}
return result;
@@ -126,14 +122,6 @@ public class DataEvolutionCompactCoordinator {
private final long targetFileSize;
private final long openFileCost;
private final long compactMinFileNum;
- private long lastRowIdStart = -1;
- private long nextRowIdExpected = -1;
- private long weightSum = 0L;
- private BinaryRow lastPartition = null;
- private boolean skipFile = false;
- private List<DataEvolutionCompactTask> tasks = new ArrayList<>();
- private List<DataFileMeta> groupFiles = new ArrayList<>();
- private List<DataFileMeta> blobFiles = new ArrayList<>();
CompactPlanner(
boolean compactBlob,
@@ -146,90 +134,121 @@ public class DataEvolutionCompactCoordinator {
this.compactMinFileNum = compactMinFileNum;
}
- List<DataEvolutionCompactTask> compactPlan(Iterable<ManifestEntry>
entries) {
- for (ManifestEntry entry : entries) {
- long rowId = entry.file().nonNullFirstRowId();
- if (rowId < lastRowIdStart) {
- throw new IllegalStateException(
- "Files are not in order by rowId. Current file
rowId: "
- + rowId
- + ", last file rowId: "
- + lastRowIdStart);
- } else if (rowId == lastRowIdStart) {
- checkArgument(
- lastPartition.equals(entry.partition()),
- "Inconsistent partition for the same rowId: " +
rowId);
- if (!skipFile) {
- if (BlobFileFormat.isBlobFile(entry.fileName())) {
- blobFiles.add(entry.file());
+ List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
+ List<DataEvolutionCompactTask> tasks = new ArrayList<>();
+ Map<BinaryRow, List<DataFileMeta>> partitionedFiles = new
LinkedHashMap<>();
+ for (ManifestEntry entry : input) {
+ partitionedFiles
+ .computeIfAbsent(entry.partition(), k -> new
ArrayList<>())
+ .add(entry.file());
+ }
+
+ for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionFiles :
+ partitionedFiles.entrySet()) {
+ BinaryRow partition = partitionFiles.getKey();
+ List<DataFileMeta> files = partitionFiles.getValue();
+ RangeHelper<DataFileMeta> rangeHelper =
+ new RangeHelper<>(
+ DataFileMeta::nonNullFirstRowId,
+ // merge adjacent files
+ f -> f.nonNullFirstRowId() + f.rowCount());
+
+ List<List<DataFileMeta>> ranges =
rangeHelper.mergeOverlappingRanges(files);
+
+ for (List<DataFileMeta> group : ranges) {
+ List<DataFileMeta> dataFiles = new ArrayList<>();
+ List<DataFileMeta> blobFiles = new ArrayList<>();
+ TreeMap<Long, DataFileMeta> treeMap = new TreeMap<>();
+ Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles
= new HashMap<>();
+ for (DataFileMeta f : group) {
+ if (!isBlobFile(f.fileName())) {
+ treeMap.put(f.nonNullFirstRowId(), f);
+ dataFiles.add(f);
} else {
- groupFiles.add(entry.file());
- weightSum += Math.max(entry.file().fileSize(),
openFileCost);
+ blobFiles.add(f);
}
}
- } else if (rowId < nextRowIdExpected) {
- checkArgument(
- lastPartition.equals(entry.partition()),
- "Inconsistent partition for the same rowId: " +
rowId);
- checkArgument(
- BlobFileFormat.isBlobFile(entry.fileName()),
- "Data file found in the middle of blob files for
rowId: " + rowId);
- if (!skipFile) {
- blobFiles.add(entry.file());
- }
- } else {
- BinaryRow currentPartition = entry.partition();
- long currentWeight = Math.max(entry.file().fileSize(),
openFileCost);
- // skip big file
- skipFile = currentWeight > targetFileSize;
-
- // If compaction condition meets, do compaction
- if (weightSum > targetFileSize
- || rowId > nextRowIdExpected
- || !currentPartition.equals(lastPartition)
- || skipFile) {
- flushAll();
+
+ if (compactBlob) {
+ // associate blob files to data files
+ for (DataFileMeta blobFile : blobFiles) {
+ Long key =
treeMap.floorKey(blobFile.nonNullFirstRowId());
+ if (key != null) {
+ DataFileMeta dataFile = treeMap.get(key);
+ if (blobFile.nonNullFirstRowId() >=
dataFile.nonNullFirstRowId()
+ && blobFile.nonNullFirstRowId()
+ <= dataFile.nonNullFirstRowId()
+ + dataFile.rowCount()
+ - 1) {
+ dataFileToBlobFiles
+ .computeIfAbsent(dataFile, k ->
new ArrayList<>())
+ .add(blobFile);
+ }
+ }
+ }
}
- if (!skipFile) {
- weightSum += currentWeight;
- groupFiles.add(entry.file());
+ RangeHelper<DataFileMeta> rangeHelper2 =
+ new RangeHelper<>(
+ DataFileMeta::nonNullFirstRowId,
+ // files group
+ f -> f.nonNullFirstRowId() + f.rowCount()
- 1);
+ List<List<DataFileMeta>> groupedFiles =
+ rangeHelper2.mergeOverlappingRanges(dataFiles);
+ List<DataFileMeta> waitCompactFiles = new ArrayList<>();
+
+ long weightSum = 0L;
+ for (List<DataFileMeta> fileGroup : groupedFiles) {
+ long currentGroupWeight =
+ fileGroup.stream()
+ .mapToLong(d -> Math.max(d.fileSize(),
openFileCost))
+ .sum();
+ if (currentGroupWeight > targetFileSize) {
+ // compact current file group to merge field files
+ tasks.addAll(triggerTask(fileGroup, partition,
dataFileToBlobFiles));
+ // compact wait compact files
+ tasks.addAll(
+ triggerTask(waitCompactFiles, partition,
dataFileToBlobFiles));
+ waitCompactFiles = new ArrayList<>();
+ weightSum = 0;
+ } else {
+ weightSum += currentGroupWeight;
+ waitCompactFiles.addAll(fileGroup);
+ if (weightSum > targetFileSize) {
+ tasks.addAll(
+ triggerTask(
+ waitCompactFiles, partition,
dataFileToBlobFiles));
+ waitCompactFiles = new ArrayList<>();
+ weightSum = 0L;
+ }
+ }
}
- lastRowIdStart = rowId;
- nextRowIdExpected = rowId + entry.file().rowCount();
- lastPartition = currentPartition;
+ tasks.addAll(triggerTask(waitCompactFiles, partition,
dataFileToBlobFiles));
}
}
- // do compaction for the last group
- flushAll();
-
- List<DataEvolutionCompactTask> result = new ArrayList<>(tasks);
- tasks = new ArrayList<>();
- return result;
+ return tasks;
}
- private void flushAll() {
- if (!groupFiles.isEmpty()) {
- if (groupFiles.size() >= compactMinFileNum) {
- tasks.add(
- new DataEvolutionCompactTask(
- lastPartition, new
ArrayList<>(groupFiles), false));
-
- if (compactBlob && blobFiles.size() > 1) {
- tasks.add(
- new DataEvolutionCompactTask(
- lastPartition, new
ArrayList<>(blobFiles), true));
- }
- }
-
- weightSum = 0L;
- groupFiles = new ArrayList<>();
- blobFiles = new ArrayList<>();
+ private List<DataEvolutionCompactTask> triggerTask(
+ List<DataFileMeta> dataFiles,
+ BinaryRow partition,
+ Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles) {
+ List<DataEvolutionCompactTask> tasks = new ArrayList<>();
+ if (dataFiles.size() >= compactMinFileNum) {
+ tasks.add(new DataEvolutionCompactTask(partition, dataFiles,
false));
}
- checkArgument(weightSum == 0L, "Weight sum should be zero after
compaction.");
- checkArgument(groupFiles.isEmpty(), "Group files should be
empty.");
- checkArgument(blobFiles.isEmpty(), "Blob files should be empty.");
+ if (compactBlob) {
+ List<DataFileMeta> blobFiles = new ArrayList<>();
+ for (DataFileMeta dataFile : dataFiles) {
+ blobFiles.addAll(
+ dataFileToBlobFiles.getOrDefault(dataFile,
Collections.emptyList()));
+ }
+ if (blobFiles.size() >= compactMinFileNum) {
+ tasks.add(new DataEvolutionCompactTask(partition,
blobFiles, true));
+ }
+ }
+ return tasks;
}
}
}