This is an automated email from the ASF dual-hosted git repository. junhao pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 948310d544006e273751ae57a50339b834dcc865 Author: YeJunHao <[email protected]> AuthorDate: Wed May 29 16:30:25 2024 +0800 [core] Fix null Partition values to Predicate conversion (#3421) --- .../apache/paimon/predicate/PredicateBuilder.java | 12 ++++++--- .../utils/RowDataToObjectArrayConverter.java | 2 +- .../paimon/partition/PartitionPredicate.java | 7 +++-- .../table/source/snapshot/SnapshotReaderImpl.java | 4 ++- .../operation/AppendOnlyFileStoreWriteTest.java | 31 ++++++++++++++++++++++ .../flink/lookup/FileStoreLookupFunction.java | 5 ++-- 6 files changed, 50 insertions(+), 11 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index f6410adca..2f50c05eb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -368,10 +368,12 @@ public class PredicateBuilder { for (Map.Entry<String, String> entry : map.entrySet()) { int idx = fieldNames.indexOf(entry.getKey()); Object literal = TypeUtils.castFromString(entry.getValue(), rowType.getTypeAt(idx)); + Predicate predicateTemp = + literal == null ? builder.isNull(idx) : builder.equal(idx, literal); if (predicate == null) { - predicate = builder.equal(idx, literal); + predicate = predicateTemp; } else { - predicate = PredicateBuilder.and(predicate, builder.equal(idx, literal)); + predicate = PredicateBuilder.and(predicate, predicateTemp); } } return predicate; @@ -394,10 +396,12 @@ public class PredicateBuilder { PredicateBuilder builder = new PredicateBuilder(partitionType); Object[] literals = converter.convert(partition); for (int i = 0; i < literals.length; i++) { + Predicate predicateTemp = + literals[i] == null ? builder.isNull(i) : builder.equal(i, literals[i]); if (predicate == null) { - predicate = builder.equal(i, literals[i]); + predicate = predicateTemp; } else { - predicate = PredicateBuilder.and(predicate, builder.equal(i, literals[i])); + predicate = PredicateBuilder.and(predicate, predicateTemp); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java index a18ec4249..436a51ada 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java @@ -75,7 +75,7 @@ public class RowDataToObjectArrayConverter implements Serializable { Object[] partitionObjects = convert(binaryRow); for (int i = 0; i < getArity(); i++) { Object o = partitionObjects[i]; - fieldPredicates.add(builder.equal(i, o)); + fieldPredicates.add(o == null ? builder.isNull(i) : builder.equal(i, o)); } return PredicateBuilder.and(fieldPredicates); } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java index 1675c6c20..663a4434b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java @@ -121,8 +121,11 @@ public interface PartitionPredicate { PredicateBuilder builder = new PredicateBuilder(partitionType); for (int i = 0; i < collectors.length; i++) { FieldStats stats = collectors[i].result(); - min[i] = builder.greaterOrEqual(i, stats.minValue()); - max[i] = builder.lessOrEqual(i, stats.maxValue()); + Object minValue = stats.minValue(); + Object maxValue = stats.maxValue(); + + min[i] = minValue == null ? builder.isNull(i) : builder.greaterOrEqual(i, minValue); + max[i] = maxValue == null ? builder.isNull(i) : builder.lessOrEqual(i, maxValue); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 2ed258e85..3136bce5f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -155,7 +155,9 @@ public class SnapshotReaderImpl implements SnapshotReader { m.getValue(), rowType.getTypeAt(index), false); - return predicateBuilder.equal(index, value); + return value == null + ? predicateBuilder.isNull(index) + : predicateBuilder.equal(index, value); }) .collect(Collectors.toList()); scan.withPartitionFilter(PredicateBuilder.and(partitionFilters)); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java index c38fb4fba..806e0ec94 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java @@ -30,16 +30,19 @@ import org.apache.paimon.disk.ExternalBuffer; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.types.DataTypes; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -170,4 +173,32 @@ public class AppendOnlyFileStoreWriteTest { writer.complete(); return binaryRow; } + + @Test + public void testScanFilterWithNullPartition() throws Exception { + FileStoreTable table = createFileStoreTable(); + + AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss"); + StreamTableCommit commit = table.newStreamWriteBuilder().newCommit(); + write.withExecutionMode(false); + + for (int i = 0; i < 100; i++) { + write.write(nullPartition(), i, GenericRow.of(null, i, i)); + commit.commit(i, write.prepareCommit(false, i)); + } + + BinaryRow binaryRow = nullPartition(); + FileStoreScan scan = table.store().newScan(); + List<SimpleFileEntry> l0 = + scan.withPartitionFilter(Collections.singletonList(binaryRow)).readSimpleEntries(); + Assertions.assertThat(l0.size()).isEqualTo(100); + } + + private BinaryRow nullPartition() { + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(binaryRow); + writer.setNullAt(0); + writer.complete(); + return binaryRow; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 6a31c432f..3f81fdf9a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -258,9 +258,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable { List<Predicate> predicates = new ArrayList<>(); for (int i = 0; i < partitionKeys.size(); i++) { int index = fieldNames.indexOf(partitionKeys.get(i)); - predicates.add( - builder.equal( - index, InternalRowUtils.get(partition, i, rowType.getTypeAt(index)))); + Object value = InternalRowUtils.get(partition, i, rowType.getTypeAt(index)); + predicates.add(value == null ? builder.isNull(index) : builder.equal(index, value)); } return PredicateBuilder.and(predicates); }
