This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e91173cfc [core] Avoid cross-file blob and vector compaction for data 
evolution (#7938)
3e91173cfc is described below

commit 3e91173cfcc0ff31fc2ec1d55eb73fcdb996aaab
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 23 21:53:21 2026 +0800

    [core] Avoid cross-file blob and vector compaction for data evolution 
(#7938)
    
    This PR prevents standalone Data Evolution dedicated-file compaction
    from combining blob or vector-store files that belong to different
    regular data-file row-id ranges.
---
 .../DataEvolutionCompactCoordinator.java           | 62 +++++++++++++-----
 .../DataEvolutionCompactCoordinatorTest.java       | 76 +++++++++++++++++++++-
 2 files changed, 121 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 837b0c3718..fc65fe95b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -356,30 +356,60 @@ public class DataEvolutionCompactCoordinator {
                 Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles,
                 Map<DataFileMeta, List<DataFileMeta>> 
dataFileToVectorStoreFiles) {
             List<DataEvolutionCompactTask> tasks = new ArrayList<>();
-            if (dataFiles.size() >= compactMinFileNum) {
+            boolean triggerNormalFile = dataFiles.size() >= compactMinFileNum;
+            if (triggerNormalFile) {
                 tasks.add(new DataEvolutionCompactTask(partition, dataFiles, 
false));
             }
 
             if (compactBlob) {
-                List<DataFileMeta> blobFiles = new ArrayList<>();
-                for (DataFileMeta dataFile : dataFiles) {
-                    blobFiles.addAll(
-                            dataFileToBlobFiles.getOrDefault(dataFile, 
Collections.emptyList()));
-                }
-                for (List<DataFileMeta> blobFilesToCompact : 
blobFileGroupsToCompact(blobFiles)) {
-                    tasks.add(new DataEvolutionCompactTask(partition, 
blobFilesToCompact, true));
+                if (triggerNormalFile) {
+                    List<DataFileMeta> blobFiles = new ArrayList<>();
+                    for (DataFileMeta dataFile : dataFiles) {
+                        blobFiles.addAll(
+                                dataFileToBlobFiles.getOrDefault(
+                                        dataFile, Collections.emptyList()));
+                    }
+                    for (List<DataFileMeta> blobFilesToCompact :
+                            blobFileGroupsToCompact(blobFiles)) {
+                        tasks.add(
+                                new DataEvolutionCompactTask(partition, 
blobFilesToCompact, true));
+                    }
+                } else {
+                    for (DataFileMeta dataFile : dataFiles) {
+                        for (List<DataFileMeta> blobFilesToCompact :
+                                blobFileGroupsToCompact(
+                                        dataFileToBlobFiles.getOrDefault(
+                                                dataFile, 
Collections.emptyList()))) {
+                            tasks.add(
+                                    new DataEvolutionCompactTask(
+                                            partition, blobFilesToCompact, 
true));
+                        }
+                    }
                 }
             }
 
             if (compactVector) {
-                List<DataFileMeta> vectorStoreFiles = new ArrayList<>();
-                for (DataFileMeta dataFile : dataFiles) {
-                    vectorStoreFiles.addAll(
-                            dataFileToVectorStoreFiles.getOrDefault(
-                                    dataFile, Collections.emptyList()));
-                }
-                if (vectorStoreFiles.size() >= compactMinFileNum) {
-                    tasks.add(new DataEvolutionCompactTask(partition, 
vectorStoreFiles, false));
+                if (triggerNormalFile) {
+                    List<DataFileMeta> vectorStoreFiles = new ArrayList<>();
+                    for (DataFileMeta dataFile : dataFiles) {
+                        vectorStoreFiles.addAll(
+                                dataFileToVectorStoreFiles.getOrDefault(
+                                        dataFile, Collections.emptyList()));
+                    }
+                    if (vectorStoreFiles.size() >= compactMinFileNum) {
+                        tasks.add(new DataEvolutionCompactTask(partition, 
vectorStoreFiles, false));
+                    }
+                } else {
+                    for (DataFileMeta dataFile : dataFiles) {
+                        List<DataFileMeta> vectorStoreFiles =
+                                dataFileToVectorStoreFiles.getOrDefault(
+                                        dataFile, Collections.emptyList());
+                        if (vectorStoreFiles.size() >= compactMinFileNum) {
+                            tasks.add(
+                                    new DataEvolutionCompactTask(
+                                            partition, vectorStoreFiles, 
false));
+                        }
+                    }
                 }
             }
             return tasks;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index 4946a61539..f2f3a16756 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -177,7 +177,8 @@ public class DataEvolutionCompactCoordinatorTest {
 
         List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
-        // Should have compaction tasks for both data files and blob files
+        // Should have compaction tasks for data files and blob files within 
the data compaction
+        // range.
         assertThat(tasks.size()).isEqualTo(2);
 
         assertThat(tasks.get(0).compactBefore())
@@ -190,6 +191,56 @@ public class DataEvolutionCompactCoordinatorTest {
                         entries.get(5).file());
     }
 
+    @Test
+    public void testCompactPlannerDoesNotCompactBlobFilesAcrossDataFiles() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeBlobEntry("file1.blob", 0L, 100L, 100, "pic"));
+        entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+        entries.add(makeBlobEntry("file2.blob", 100L, 100L, 100, "pic"));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(1024, 1024, 100, rowType(new DataField(1, "pic", 
DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).isEmpty();
+
+        planner = blobPlanner(1024, 1024, 2, rowType(new DataField(1, "pic", 
DataTypes.BLOB())));
+        tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(0).file(), entries.get(2).file());
+        assertThat(tasks.get(1).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(3).file());
+    }
+
+    @Test
+    public void 
testCompactPlannerDoesNotCompactVectorStoreFilesAcrossDataFiles() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeVectorStoreEntry("file1.vector.json", 0L, 100L, 100));
+        entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+        entries.add(makeVectorStoreEntry("file2.vector.json", 100L, 100L, 
100));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                vectorStorePlanner(1024, 1024, 100);
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).isEmpty();
+
+        planner = vectorStorePlanner(1024, 1024, 2);
+        tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(0).file(), entries.get(2).file());
+        assertThat(tasks.get(1).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(3).file());
+    }
+
     @Test
     public void testCompactPlannerSkipsSingleFilePerBlobField() {
         List<ManifestEntry> entries = new ArrayList<>();
@@ -468,6 +519,23 @@ public class DataEvolutionCompactCoordinatorTest {
                         writeCol == null ? null : 
Collections.singletonList(writeCol)));
     }
 
+    private ManifestEntry makeVectorStoreEntry(
+            String fileName, long firstRowId, long rowCount, long fileSize) {
+        return ManifestEntry.create(
+                FileKind.ADD,
+                BinaryRow.EMPTY_ROW,
+                0,
+                0,
+                createDataFileMeta(
+                        fileName,
+                        firstRowId,
+                        rowCount,
+                        0,
+                        fileSize,
+                        0,
+                        Collections.singletonList("vec")));
+    }
+
     private DataFileMeta createDataFileMeta(
             String fileName, long firstRowId, long rowCount, long maxSeq, long 
fileSize) {
         return createDataFileMeta(fileName, firstRowId, rowCount, maxSeq, 
fileSize, 0, null);
@@ -537,6 +605,12 @@ public class DataEvolutionCompactCoordinatorTest {
                 fieldIds(currentRowType));
     }
 
+    private DataEvolutionCompactCoordinator.CompactPlanner vectorStorePlanner(
+            long targetFileSize, long openFileCost, long compactMinFileNum) {
+        return new DataEvolutionCompactCoordinator.CompactPlanner(
+                false, true, targetFileSize, openFileCost, compactMinFileNum);
+    }
+
     private RowType rowType(DataField... fields) {
         return new RowType(Arrays.asList(fields));
     }

Reply via email to