This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3e91173cfc [core] Avoid cross-file blob and vector compaction for data
evolution (#7938)
3e91173cfc is described below
commit 3e91173cfcc0ff31fc2ec1d55eb73fcdb996aaab
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 23 21:53:21 2026 +0800
[core] Avoid cross-file blob and vector compaction for data evolution
(#7938)
This PR prevents standalone Data Evolution dedicated-file compaction
from combining blob or vector-store files that belong to different
regular data-file row-id ranges.
---
.../DataEvolutionCompactCoordinator.java | 62 +++++++++++++-----
.../DataEvolutionCompactCoordinatorTest.java | 76 +++++++++++++++++++++-
2 files changed, 121 insertions(+), 17 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 837b0c3718..fc65fe95b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -356,30 +356,60 @@ public class DataEvolutionCompactCoordinator {
Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles,
Map<DataFileMeta, List<DataFileMeta>>
dataFileToVectorStoreFiles) {
List<DataEvolutionCompactTask> tasks = new ArrayList<>();
- if (dataFiles.size() >= compactMinFileNum) {
+ boolean triggerNormalFile = dataFiles.size() >= compactMinFileNum;
+ if (triggerNormalFile) {
tasks.add(new DataEvolutionCompactTask(partition, dataFiles,
false));
}
if (compactBlob) {
- List<DataFileMeta> blobFiles = new ArrayList<>();
- for (DataFileMeta dataFile : dataFiles) {
- blobFiles.addAll(
- dataFileToBlobFiles.getOrDefault(dataFile,
Collections.emptyList()));
- }
- for (List<DataFileMeta> blobFilesToCompact :
blobFileGroupsToCompact(blobFiles)) {
- tasks.add(new DataEvolutionCompactTask(partition,
blobFilesToCompact, true));
+ if (triggerNormalFile) {
+ List<DataFileMeta> blobFiles = new ArrayList<>();
+ for (DataFileMeta dataFile : dataFiles) {
+ blobFiles.addAll(
+ dataFileToBlobFiles.getOrDefault(
+ dataFile, Collections.emptyList()));
+ }
+ for (List<DataFileMeta> blobFilesToCompact :
+ blobFileGroupsToCompact(blobFiles)) {
+ tasks.add(
+ new DataEvolutionCompactTask(partition,
blobFilesToCompact, true));
+ }
+ } else {
+ for (DataFileMeta dataFile : dataFiles) {
+ for (List<DataFileMeta> blobFilesToCompact :
+ blobFileGroupsToCompact(
+ dataFileToBlobFiles.getOrDefault(
+ dataFile,
Collections.emptyList()))) {
+ tasks.add(
+ new DataEvolutionCompactTask(
+ partition, blobFilesToCompact,
true));
+ }
+ }
}
}
if (compactVector) {
- List<DataFileMeta> vectorStoreFiles = new ArrayList<>();
- for (DataFileMeta dataFile : dataFiles) {
- vectorStoreFiles.addAll(
- dataFileToVectorStoreFiles.getOrDefault(
- dataFile, Collections.emptyList()));
- }
- if (vectorStoreFiles.size() >= compactMinFileNum) {
- tasks.add(new DataEvolutionCompactTask(partition,
vectorStoreFiles, false));
+ if (triggerNormalFile) {
+ List<DataFileMeta> vectorStoreFiles = new ArrayList<>();
+ for (DataFileMeta dataFile : dataFiles) {
+ vectorStoreFiles.addAll(
+ dataFileToVectorStoreFiles.getOrDefault(
+ dataFile, Collections.emptyList()));
+ }
+ if (vectorStoreFiles.size() >= compactMinFileNum) {
+ tasks.add(new DataEvolutionCompactTask(partition,
vectorStoreFiles, false));
+ }
+ } else {
+ for (DataFileMeta dataFile : dataFiles) {
+ List<DataFileMeta> vectorStoreFiles =
+ dataFileToVectorStoreFiles.getOrDefault(
+ dataFile, Collections.emptyList());
+ if (vectorStoreFiles.size() >= compactMinFileNum) {
+ tasks.add(
+ new DataEvolutionCompactTask(
+ partition, vectorStoreFiles,
false));
+ }
+ }
}
}
return tasks;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index 4946a61539..f2f3a16756 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -177,7 +177,8 @@ public class DataEvolutionCompactCoordinatorTest {
List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
- // Should have compaction tasks for both data files and blob files
+ // Should have compaction tasks for data files and blob files within
the data compaction
+ // range.
assertThat(tasks.size()).isEqualTo(2);
assertThat(tasks.get(0).compactBefore())
@@ -190,6 +191,56 @@ public class DataEvolutionCompactCoordinatorTest {
entries.get(5).file());
}
+ @Test
+ public void testCompactPlannerDoesNotCompactBlobFilesAcrossDataFiles() {
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+ entries.add(makeBlobEntry("file1.blob", 0L, 100L, 100, "pic"));
+ entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+ entries.add(makeBlobEntry("file2.blob", 100L, 100L, 100, "pic"));
+
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ blobPlanner(1024, 1024, 100, rowType(new DataField(1, "pic",
DataTypes.BLOB())));
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+ assertThat(tasks).isEmpty();
+
+ planner = blobPlanner(1024, 1024, 2, rowType(new DataField(1, "pic",
DataTypes.BLOB())));
+ tasks = planner.compactPlan(entries);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(entries.get(0).file(), entries.get(2).file());
+ assertThat(tasks.get(1).compactBefore())
+ .containsExactly(entries.get(1).file(), entries.get(3).file());
+ }
+
+ @Test
+ public void
testCompactPlannerDoesNotCompactVectorStoreFilesAcrossDataFiles() {
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+ entries.add(makeVectorStoreEntry("file1.vector.json", 0L, 100L, 100));
+ entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+ entries.add(makeVectorStoreEntry("file2.vector.json", 100L, 100L,
100));
+
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ vectorStorePlanner(1024, 1024, 100);
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+ assertThat(tasks).isEmpty();
+
+ planner = vectorStorePlanner(1024, 1024, 2);
+ tasks = planner.compactPlan(entries);
+
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(entries.get(0).file(), entries.get(2).file());
+ assertThat(tasks.get(1).compactBefore())
+ .containsExactly(entries.get(1).file(), entries.get(3).file());
+ }
+
@Test
public void testCompactPlannerSkipsSingleFilePerBlobField() {
List<ManifestEntry> entries = new ArrayList<>();
@@ -468,6 +519,23 @@ public class DataEvolutionCompactCoordinatorTest {
writeCol == null ? null :
Collections.singletonList(writeCol)));
}
+ private ManifestEntry makeVectorStoreEntry(
+ String fileName, long firstRowId, long rowCount, long fileSize) {
+ return ManifestEntry.create(
+ FileKind.ADD,
+ BinaryRow.EMPTY_ROW,
+ 0,
+ 0,
+ createDataFileMeta(
+ fileName,
+ firstRowId,
+ rowCount,
+ 0,
+ fileSize,
+ 0,
+ Collections.singletonList("vec")));
+ }
+
private DataFileMeta createDataFileMeta(
String fileName, long firstRowId, long rowCount, long maxSeq, long
fileSize) {
return createDataFileMeta(fileName, firstRowId, rowCount, maxSeq,
fileSize, 0, null);
@@ -537,6 +605,12 @@ public class DataEvolutionCompactCoordinatorTest {
fieldIds(currentRowType));
}
+ private DataEvolutionCompactCoordinator.CompactPlanner vectorStorePlanner(
+ long targetFileSize, long openFileCost, long compactMinFileNum) {
+ return new DataEvolutionCompactCoordinator.CompactPlanner(
+ false, true, targetFileSize, openFileCost, compactMinFileNum);
+ }
+
private RowType rowType(DataField... fields) {
return new RowType(Arrays.asList(fields));
}