This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 235aac83a5 [core] Fix data evolution compact partition filtering
(#8169)
235aac83a5 is described below
commit 235aac83a557c476d908331edcbd8fbb60322fde
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 13:33:37 2026 +0800
[core] Fix data evolution compact partition filtering (#8169)
---
.../DataEvolutionCompactCoordinator.java | 2 +-
.../DataEvolutionCompactCoordinatorTest.java | 83 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index c41493564c..3c1496b851 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -86,7 +86,7 @@ public class DataEvolutionCompactCoordinator {
this.scanner =
new CompactScanner(
table.newSnapshotReader().withPartitionFilter(partitionPredicate),
- table.store().newScan());
+
table.store().newScan().withPartitionFilter(partitionPredicate));
this.planner =
new CompactPlanner(
compactBlob,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index fe1b2dfc3e..3c1ba1f796 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -52,6 +52,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongFunction;
import static org.assertj.core.api.Assertions.assertThat;
@@ -424,6 +425,7 @@ public class DataEvolutionCompactCoordinatorTest {
when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
when(table.store()).thenReturn(fileStore);
when(fileStore.newScan()).thenReturn(scan);
+ when(scan.withPartitionFilter((PartitionPredicate)
null)).thenReturn(scan);
ManifestFileMeta metaWithNullRowId =
new ManifestFileMeta(
@@ -471,6 +473,87 @@ public class DataEvolutionCompactCoordinatorTest {
.containsExactly(entry1.file().fileName(),
entry2.file().fileName());
}
+ @Test
+ public void testPlanWithPartitionFilterDoesNotCompactOtherPartitions() {
+ FileStoreTable table = mock(FileStoreTable.class);
+ SnapshotReader snapshotReader = mock(SnapshotReader.class);
+ SnapshotManager snapshotManager = mock(SnapshotManager.class);
+ Snapshot snapshot = mock(Snapshot.class);
+ ManifestsReader manifestsReader = mock(ManifestsReader.class);
+ FileStore fileStore = mock(FileStore.class);
+ FileStoreScan scan = mock(FileStoreScan.class);
+ PartitionPredicate partitionPredicate = mock(PartitionPredicate.class);
+ AtomicBoolean entryPartitionFilterApplied = new AtomicBoolean(false);
+
+ Options options = new Options();
+ options.set("target-file-size", "1 kb");
+ options.set("source.split.open-file-cost", "1 b");
+ options.set("compaction.min.file-num", "2");
+ when(table.coreOptions()).thenReturn(new CoreOptions(options));
+ when(table.newSnapshotReader()).thenReturn(snapshotReader);
+
when(snapshotReader.withPartitionFilter(partitionPredicate)).thenReturn(snapshotReader);
+ when(snapshotReader.snapshotManager()).thenReturn(snapshotManager);
+ when(snapshotManager.latestSnapshot()).thenReturn(snapshot);
+ when(snapshotReader.manifestsReader()).thenReturn(manifestsReader);
+ when(table.store()).thenReturn(fileStore);
+ when(fileStore.newScan()).thenReturn(scan);
+ when(scan.withPartitionFilter(partitionPredicate))
+ .thenAnswer(
+ invocation -> {
+ entryPartitionFilterApplied.set(true);
+ return scan;
+ });
+
+ ManifestFileMeta sharedManifest =
+ new ManifestFileMeta(
+ "shared-manifest",
+ 1L,
+ 4L,
+ 0L,
+ StatsTestUtils.newEmptySimpleStats(),
+ 0L,
+ null,
+ null,
+ null,
+ null,
+ 0L,
+ 399L);
+ when(manifestsReader.read(snapshot, ScanMode.ALL))
+ .thenReturn(
+ new ManifestsReader.Result(
+ snapshot,
+ Collections.singletonList(sharedManifest),
+ Collections.singletonList(sharedManifest)));
+
+ BinaryRow partitionA = BinaryRow.singleColumn(0);
+ BinaryRow partitionB = BinaryRow.singleColumn(1);
+ ManifestEntry a1 = makeEntry(partitionA, "a-1.parquet", 0L, 100L, 600);
+ ManifestEntry a2 = makeEntry(partitionA, "a-2.parquet", 100L, 100L,
600);
+ ManifestEntry b1 = makeEntry(partitionB, "b-1.parquet", 0L, 100L, 600);
+ ManifestEntry b2 = makeEntry(partitionB, "b-2.parquet", 100L, 100L,
600);
+ when(scan.readFileIterator(Collections.singletonList(sharedManifest)))
+ .thenAnswer(
+ invocation ->
+ Arrays.asList(a1, a2, b1, b2).stream()
+ .filter(
+ entry ->
+
!entryPartitionFilterApplied.get()
+ ||
partitionPredicate.test(
+
entry.partition()))
+ .iterator());
+ when(partitionPredicate.test(partitionA)).thenReturn(false);
+ when(partitionPredicate.test(partitionB)).thenReturn(true);
+
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, partitionPredicate,
false, false);
+ List<DataEvolutionCompactTask> tasks = coordinator.plan();
+
+ assertThat(tasks).hasSize(1);
+ assertThat(tasks.get(0).partition()).isEqualTo(partitionB);
+
assertThat(tasks.get(0).compactBefore().stream().map(DataFileMeta::fileName))
+ .containsExactly(b1.file().fileName(), b2.file().fileName());
+ }
+
private ManifestEntry makeEntry(
String fileName, long firstRowId, long rowCount, long fileSize) {
return makeEntryWithSize(