This is an automated email from the ASF dual-hosted git repository. junhao pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit b45b01cd3829eaad926b8b6ca9eebb928a3a8d1d Author: YeJunHao <[email protected]> AuthorDate: Thu May 30 13:41:12 2024 +0800 [core] [bug] Fix Partition Predicate null values process. (#3431) --- .../paimon/partition/PartitionPredicate.java | 4 ++ .../operation/AppendOnlyFileStoreWriteTest.java | 72 +++++++++++++++++++++- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java index 8bd73b7eb..e2d1fb018 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java @@ -133,6 +133,10 @@ public interface PartitionPredicate { } else { min[i] = builder.greaterOrEqual(i, checkNotNull(stats.minValue())); max[i] = builder.lessOrEqual(i, checkNotNull(stats.maxValue())); + if (stats.nullCount() > 0) { + min[i] = PredicateBuilder.or(builder.isNull(i), min[i]); + max[i] = PredicateBuilder.or(builder.isNull(i), max[i]); + } } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java index 806e0ec94..3551b4067 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java @@ -42,7 +42,7 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Random; @@ -175,7 +175,39 @@ public class AppendOnlyFileStoreWriteTest { } @Test - public void testScanFilterWithNullPartition() throws Exception { + public void testScanFilterWithMixedPartitionWrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + + AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss"); + StreamTableCommit commit = table.newStreamWriteBuilder().newCommit(); + write.withExecutionMode(false); + + for (int i = 0; i < 100; i++) { + if (i == 0) { + write.write(nullPartition(), i, GenericRow.of(null, i, i)); + commit.commit(i, write.prepareCommit(false, i)); + } else { + write.write(partition(1), i, GenericRow.of(null, i, i)); + commit.commit(i, write.prepareCommit(false, i)); + } + } + + BinaryRow binaryRow = nullPartition(); + FileStoreScan scan = table.store().newScan(); + List<SimpleFileEntry> l0 = + scan.withPartitionFilter(Arrays.asList(binaryRow)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(1); + + BinaryRow binaryRow1 = partition(1); + l0 = scan.withPartitionFilter(Arrays.asList(binaryRow, binaryRow1)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(100); + + l0 = scan.withPartitionFilter(Arrays.asList(binaryRow1)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(99); + } + + @Test + public void testScanFilterWithAllNullPartitionWrite() throws Exception { FileStoreTable table = createFileStoreTable(); AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss"); @@ -190,7 +222,41 @@ public class AppendOnlyFileStoreWriteTest { BinaryRow binaryRow = nullPartition(); FileStoreScan scan = table.store().newScan(); List<SimpleFileEntry> l0 = - scan.withPartitionFilter(Collections.singletonList(binaryRow)).readSimpleEntries(); + scan.withPartitionFilter(Arrays.asList(binaryRow)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(100); + + BinaryRow binaryRow1 = partition(1); + l0 = scan.withPartitionFilter(Arrays.asList(binaryRow, binaryRow1)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(100); + + l0 = scan.withPartitionFilter(Arrays.asList(binaryRow1)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(0); + } + + @Test + public void testScanFilterWithNoneNullPartitionWrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + + AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss"); + StreamTableCommit commit = table.newStreamWriteBuilder().newCommit(); + write.withExecutionMode(false); + + for (int i = 0; i < 100; i++) { + write.write(partition(1), i, GenericRow.of(null, i, i)); + commit.commit(i, write.prepareCommit(false, i)); + } + + BinaryRow binaryRow = nullPartition(); + FileStoreScan scan = table.store().newScan(); + List<SimpleFileEntry> l0 = + scan.withPartitionFilter(Arrays.asList(binaryRow)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(0); + + BinaryRow binaryRow1 = partition(1); + l0 = scan.withPartitionFilter(Arrays.asList(binaryRow, binaryRow1)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(100); + + l0 = scan.withPartitionFilter(Arrays.asList(binaryRow1)).readSimpleEntries(); Assertions.assertThat(l0.size()).isEqualTo(100); }
