This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f8f700148 [core] [bug] Fix Partition Predicate null values process.
(#3431)
f8f700148 is described below
commit f8f70014810bcde74247b6bf42a0332d24904e08
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 30 13:41:12 2024 +0800
[core] [bug] Fix Partition Predicate null values process. (#3431)
---
.../paimon/partition/PartitionPredicate.java | 4 ++
.../operation/AppendOnlyFileStoreWriteTest.java | 72 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index c2434d4b3..b8339fee2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -133,6 +133,10 @@ public interface PartitionPredicate {
} else {
min[i] = builder.greaterOrEqual(i,
checkNotNull(stats.min()));
max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
+ if (stats.nullCount() > 0) {
+ min[i] = PredicateBuilder.or(builder.isNull(i),
min[i]);
+ max[i] = PredicateBuilder.or(builder.isNull(i),
max[i]);
+ }
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
index eb1f208bd..438b3699b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
@@ -42,7 +42,7 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -176,7 +176,39 @@ public class AppendOnlyFileStoreWriteTest {
}
@Test
- public void testScanFilterWithNullPartition() throws Exception {
+ public void testScanFilterWithMixedPartitionWrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite)
table.store().newWrite("ss");
+ StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
+ write.withExecutionMode(false);
+
+ for (int i = 0; i < 100; i++) {
+ if (i == 0) {
+ write.write(nullPartition(), i, GenericRow.of(null, i, i));
+ commit.commit(i, write.prepareCommit(false, i));
+ } else {
+ write.write(partition(1), i, GenericRow.of(null, i, i));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+ }
+
+ BinaryRow binaryRow = nullPartition();
+ FileStoreScan scan = table.store().newScan();
+ List<SimpleFileEntry> l0 =
+
scan.withPartitionFilter(Arrays.asList(binaryRow)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(1);
+
+ BinaryRow binaryRow1 = partition(1);
+ l0 = scan.withPartitionFilter(Arrays.asList(binaryRow,
binaryRow1)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(100);
+
+ l0 =
scan.withPartitionFilter(Arrays.asList(binaryRow1)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(99);
+ }
+
+ @Test
+ public void testScanFilterWithAllNullPartitionWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite)
table.store().newWrite("ss");
@@ -191,7 +223,41 @@ public class AppendOnlyFileStoreWriteTest {
BinaryRow binaryRow = nullPartition();
FileStoreScan scan = table.store().newScan();
List<SimpleFileEntry> l0 =
-
scan.withPartitionFilter(Collections.singletonList(binaryRow)).readSimpleEntries();
+
scan.withPartitionFilter(Arrays.asList(binaryRow)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(100);
+
+ BinaryRow binaryRow1 = partition(1);
+ l0 = scan.withPartitionFilter(Arrays.asList(binaryRow,
binaryRow1)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(100);
+
+ l0 =
scan.withPartitionFilter(Arrays.asList(binaryRow1)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testScanFilterWithNoneNullPartitionWrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite)
table.store().newWrite("ss");
+ StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
+ write.withExecutionMode(false);
+
+ for (int i = 0; i < 100; i++) {
+ write.write(partition(1), i, GenericRow.of(null, i, i));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+
+ BinaryRow binaryRow = nullPartition();
+ FileStoreScan scan = table.store().newScan();
+ List<SimpleFileEntry> l0 =
+
scan.withPartitionFilter(Arrays.asList(binaryRow)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(0);
+
+ BinaryRow binaryRow1 = partition(1);
+ l0 = scan.withPartitionFilter(Arrays.asList(binaryRow,
binaryRow1)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(100);
+
+ l0 =
scan.withPartitionFilter(Arrays.asList(binaryRow1)).readSimpleEntries();
Assertions.assertThat(l0.size()).isEqualTo(100);
}