This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 235aac83a5 [core] Fix data evolution compact partition filtering 
(#8169)
235aac83a5 is described below

commit 235aac83a557c476d908331edcbd8fbb60322fde
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 13:33:37 2026 +0800

    [core] Fix data evolution compact partition filtering (#8169)
---
 .../DataEvolutionCompactCoordinator.java           |  2 +-
 .../DataEvolutionCompactCoordinatorTest.java       | 83 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index c41493564c..3c1496b851 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -86,7 +86,7 @@ public class DataEvolutionCompactCoordinator {
         this.scanner =
                 new CompactScanner(
                         
table.newSnapshotReader().withPartitionFilter(partitionPredicate),
-                        table.store().newScan());
+                        
table.store().newScan().withPartitionFilter(partitionPredicate));
         this.planner =
                 new CompactPlanner(
                         compactBlob,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index fe1b2dfc3e..3c1ba1f796 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -52,6 +52,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongFunction;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -424,6 +425,7 @@ public class DataEvolutionCompactCoordinatorTest {
         when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
         when(table.store()).thenReturn(fileStore);
         when(fileStore.newScan()).thenReturn(scan);
+        when(scan.withPartitionFilter((PartitionPredicate) 
null)).thenReturn(scan);
 
         ManifestFileMeta metaWithNullRowId =
                 new ManifestFileMeta(
@@ -471,6 +473,87 @@ public class DataEvolutionCompactCoordinatorTest {
                 .containsExactly(entry1.file().fileName(), 
entry2.file().fileName());
     }
 
+    @Test
+    public void testPlanWithPartitionFilterDoesNotCompactOtherPartitions() {
+        FileStoreTable table = mock(FileStoreTable.class);
+        SnapshotReader snapshotReader = mock(SnapshotReader.class);
+        SnapshotManager snapshotManager = mock(SnapshotManager.class);
+        Snapshot snapshot = mock(Snapshot.class);
+        ManifestsReader manifestsReader = mock(ManifestsReader.class);
+        FileStore fileStore = mock(FileStore.class);
+        FileStoreScan scan = mock(FileStoreScan.class);
+        PartitionPredicate partitionPredicate = mock(PartitionPredicate.class);
+        AtomicBoolean entryPartitionFilterApplied = new AtomicBoolean(false);
+
+        Options options = new Options();
+        options.set("target-file-size", "1 kb");
+        options.set("source.split.open-file-cost", "1 b");
+        options.set("compaction.min.file-num", "2");
+        when(table.coreOptions()).thenReturn(new CoreOptions(options));
+        when(table.newSnapshotReader()).thenReturn(snapshotReader);
+        
when(snapshotReader.withPartitionFilter(partitionPredicate)).thenReturn(snapshotReader);
+        when(snapshotReader.snapshotManager()).thenReturn(snapshotManager);
+        when(snapshotManager.latestSnapshot()).thenReturn(snapshot);
+        when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
+        when(table.store()).thenReturn(fileStore);
+        when(fileStore.newScan()).thenReturn(scan);
+        when(scan.withPartitionFilter(partitionPredicate))
+                .thenAnswer(
+                        invocation -> {
+                            entryPartitionFilterApplied.set(true);
+                            return scan;
+                        });
+
+        ManifestFileMeta sharedManifest =
+                new ManifestFileMeta(
+                        "shared-manifest",
+                        1L,
+                        4L,
+                        0L,
+                        StatsTestUtils.newEmptySimpleStats(),
+                        0L,
+                        null,
+                        null,
+                        null,
+                        null,
+                        0L,
+                        399L);
+        when(manifestsReader.read(snapshot, ScanMode.ALL))
+                .thenReturn(
+                        new ManifestsReader.Result(
+                                snapshot,
+                                Collections.singletonList(sharedManifest),
+                                Collections.singletonList(sharedManifest)));
+
+        BinaryRow partitionA = BinaryRow.singleColumn(0);
+        BinaryRow partitionB = BinaryRow.singleColumn(1);
+        ManifestEntry a1 = makeEntry(partitionA, "a-1.parquet", 0L, 100L, 600);
+        ManifestEntry a2 = makeEntry(partitionA, "a-2.parquet", 100L, 100L, 
600);
+        ManifestEntry b1 = makeEntry(partitionB, "b-1.parquet", 0L, 100L, 600);
+        ManifestEntry b2 = makeEntry(partitionB, "b-2.parquet", 100L, 100L, 
600);
+        when(scan.readFileIterator(Collections.singletonList(sharedManifest)))
+                .thenAnswer(
+                        invocation ->
+                                Arrays.asList(a1, a2, b1, b2).stream()
+                                        .filter(
+                                                entry ->
+                                                        
!entryPartitionFilterApplied.get()
+                                                                || 
partitionPredicate.test(
+                                                                        
entry.partition()))
+                                        .iterator());
+        when(partitionPredicate.test(partitionA)).thenReturn(false);
+        when(partitionPredicate.test(partitionB)).thenReturn(true);
+
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, partitionPredicate, 
false, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).partition()).isEqualTo(partitionB);
+        
assertThat(tasks.get(0).compactBefore().stream().map(DataFileMeta::fileName))
+                .containsExactly(b1.file().fileName(), b2.file().fileName());
+    }
+
     private ManifestEntry makeEntry(
             String fileName, long firstRowId, long rowCount, long fileSize) {
         return makeEntryWithSize(

Reply via email to