This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1a3959be6 [core] Fix null Partition values to Predicate conversion
(#3421)
1a3959be6 is described below
commit 1a3959be6670888a2eef8bd3ef0dd0b54c5f2c83
Author: YeJunHao <[email protected]>
AuthorDate: Wed May 29 16:30:25 2024 +0800
[core] Fix null Partition values to Predicate conversion (#3421)
---
.../apache/paimon/predicate/PredicateBuilder.java | 12 ++++++---
.../utils/RowDataToObjectArrayConverter.java | 2 +-
.../paimon/partition/PartitionPredicate.java | 7 +++--
.../table/source/snapshot/SnapshotReaderImpl.java | 4 ++-
.../operation/AppendOnlyFileStoreWriteTest.java | 31 ++++++++++++++++++++++
.../flink/lookup/FileStoreLookupFunction.java | 5 ++--
6 files changed, 50 insertions(+), 11 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index f6410adca..2f50c05eb 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -368,10 +368,12 @@ public class PredicateBuilder {
for (Map.Entry<String, String> entry : map.entrySet()) {
int idx = fieldNames.indexOf(entry.getKey());
Object literal = TypeUtils.castFromString(entry.getValue(),
rowType.getTypeAt(idx));
+ Predicate predicateTemp =
+ literal == null ? builder.isNull(idx) : builder.equal(idx,
literal);
if (predicate == null) {
- predicate = builder.equal(idx, literal);
+ predicate = predicateTemp;
} else {
- predicate = PredicateBuilder.and(predicate, builder.equal(idx,
literal));
+ predicate = PredicateBuilder.and(predicate, predicateTemp);
}
}
return predicate;
@@ -394,10 +396,12 @@ public class PredicateBuilder {
PredicateBuilder builder = new PredicateBuilder(partitionType);
Object[] literals = converter.convert(partition);
for (int i = 0; i < literals.length; i++) {
+ Predicate predicateTemp =
+ literals[i] == null ? builder.isNull(i) : builder.equal(i,
literals[i]);
if (predicate == null) {
- predicate = builder.equal(i, literals[i]);
+ predicate = predicateTemp;
} else {
- predicate = PredicateBuilder.and(predicate, builder.equal(i,
literals[i]));
+ predicate = PredicateBuilder.and(predicate, predicateTemp);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index a18ec4249..436a51ada 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -75,7 +75,7 @@ public class RowDataToObjectArrayConverter implements
Serializable {
Object[] partitionObjects = convert(binaryRow);
for (int i = 0; i < getArity(); i++) {
Object o = partitionObjects[i];
- fieldPredicates.add(builder.equal(i, o));
+ fieldPredicates.add(o == null ? builder.isNull(i) :
builder.equal(i, o));
}
return PredicateBuilder.and(fieldPredicates);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 681cbfc80..b8893db6b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -121,8 +121,11 @@ public interface PartitionPredicate {
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (int i = 0; i < collectors.length; i++) {
SimpleColStats stats = collectors[i].result();
- min[i] = builder.greaterOrEqual(i, stats.min());
- max[i] = builder.lessOrEqual(i, stats.max());
+ Object minValue = stats.min();
+ Object maxValue = stats.max();
+
+ min[i] = minValue == null ? builder.isNull(i) :
builder.greaterOrEqual(i, minValue);
+ max[i] = maxValue == null ? builder.isNull(i) :
builder.lessOrEqual(i, maxValue);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ffb52ce6f..a6082bdcd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -153,7 +153,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
m.getValue(),
rowType.getTypeAt(index),
false);
- return predicateBuilder.equal(index,
value);
+ return value == null
+ ?
predicateBuilder.isNull(index)
+ :
predicateBuilder.equal(index, value);
})
.collect(Collectors.toList());
scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
index 0fa8861ae..eb1f208bd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
@@ -30,16 +30,19 @@ import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -171,4 +174,32 @@ public class AppendOnlyFileStoreWriteTest {
writer.complete();
return binaryRow;
}
+
+ @Test
+ public void testScanFilterWithNullPartition() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite)
table.store().newWrite("ss");
+ StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
+ write.withExecutionMode(false);
+
+ for (int i = 0; i < 100; i++) {
+ write.write(nullPartition(), i, GenericRow.of(null, i, i));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+
+ BinaryRow binaryRow = nullPartition();
+ FileStoreScan scan = table.store().newScan();
+ List<SimpleFileEntry> l0 =
+
scan.withPartitionFilter(Collections.singletonList(binaryRow)).readSimpleEntries();
+ Assertions.assertThat(l0.size()).isEqualTo(100);
+ }
+
+ private BinaryRow nullPartition() {
+ BinaryRow binaryRow = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ writer.setNullAt(0);
+ writer.complete();
+ return binaryRow;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 6a31c432f..3f81fdf9a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -258,9 +258,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
List<Predicate> predicates = new ArrayList<>();
for (int i = 0; i < partitionKeys.size(); i++) {
int index = fieldNames.indexOf(partitionKeys.get(i));
- predicates.add(
- builder.equal(
- index, InternalRowUtils.get(partition, i,
rowType.getTypeAt(index))));
+ Object value = InternalRowUtils.get(partition, i,
rowType.getTypeAt(index));
+ predicates.add(value == null ? builder.isNull(index) :
builder.equal(index, value));
}
return PredicateBuilder.and(predicates);
}