This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a3959be6 [core] Fix null Partition values to Predicate conversion 
(#3421)
1a3959be6 is described below

commit 1a3959be6670888a2eef8bd3ef0dd0b54c5f2c83
Author: YeJunHao <[email protected]>
AuthorDate: Wed May 29 16:30:25 2024 +0800

    [core] Fix null Partition values to Predicate conversion (#3421)
---
 .../apache/paimon/predicate/PredicateBuilder.java  | 12 ++++++---
 .../utils/RowDataToObjectArrayConverter.java       |  2 +-
 .../paimon/partition/PartitionPredicate.java       |  7 +++--
 .../table/source/snapshot/SnapshotReaderImpl.java  |  4 ++-
 .../operation/AppendOnlyFileStoreWriteTest.java    | 31 ++++++++++++++++++++++
 .../flink/lookup/FileStoreLookupFunction.java      |  5 ++--
 6 files changed, 50 insertions(+), 11 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index f6410adca..2f50c05eb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -368,10 +368,12 @@ public class PredicateBuilder {
         for (Map.Entry<String, String> entry : map.entrySet()) {
             int idx = fieldNames.indexOf(entry.getKey());
             Object literal = TypeUtils.castFromString(entry.getValue(), 
rowType.getTypeAt(idx));
+            Predicate predicateTemp =
+                    literal == null ? builder.isNull(idx) : builder.equal(idx, 
literal);
             if (predicate == null) {
-                predicate = builder.equal(idx, literal);
+                predicate = predicateTemp;
             } else {
-                predicate = PredicateBuilder.and(predicate, builder.equal(idx, 
literal));
+                predicate = PredicateBuilder.and(predicate, predicateTemp);
             }
         }
         return predicate;
@@ -394,10 +396,12 @@ public class PredicateBuilder {
         PredicateBuilder builder = new PredicateBuilder(partitionType);
         Object[] literals = converter.convert(partition);
         for (int i = 0; i < literals.length; i++) {
+            Predicate predicateTemp =
+                    literals[i] == null ? builder.isNull(i) : builder.equal(i, 
literals[i]);
             if (predicate == null) {
-                predicate = builder.equal(i, literals[i]);
+                predicate = predicateTemp;
             } else {
-                predicate = PredicateBuilder.and(predicate, builder.equal(i, 
literals[i]));
+                predicate = PredicateBuilder.and(predicate, predicateTemp);
             }
         }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index a18ec4249..436a51ada 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -75,7 +75,7 @@ public class RowDataToObjectArrayConverter implements 
Serializable {
         Object[] partitionObjects = convert(binaryRow);
         for (int i = 0; i < getArity(); i++) {
             Object o = partitionObjects[i];
-            fieldPredicates.add(builder.equal(i, o));
+            fieldPredicates.add(o == null ? builder.isNull(i) : 
builder.equal(i, o));
         }
         return PredicateBuilder.and(fieldPredicates);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 681cbfc80..b8893db6b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -121,8 +121,11 @@ public interface PartitionPredicate {
             PredicateBuilder builder = new PredicateBuilder(partitionType);
             for (int i = 0; i < collectors.length; i++) {
                 SimpleColStats stats = collectors[i].result();
-                min[i] = builder.greaterOrEqual(i, stats.min());
-                max[i] = builder.lessOrEqual(i, stats.max());
+                Object minValue = stats.min();
+                Object maxValue = stats.max();
+
+                min[i] = minValue == null ? builder.isNull(i) : 
builder.greaterOrEqual(i, minValue);
+                max[i] = maxValue == null ? builder.isNull(i) : 
builder.lessOrEqual(i, maxValue);
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ffb52ce6f..a6082bdcd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -153,7 +153,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
                                                         m.getValue(),
                                                         
rowType.getTypeAt(index),
                                                         false);
-                                        return predicateBuilder.equal(index, 
value);
+                                        return value == null
+                                                ? 
predicateBuilder.isNull(index)
+                                                : 
predicateBuilder.equal(index, value);
                                     })
                             .collect(Collectors.toList());
             scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
index 0fa8861ae..eb1f208bd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java
@@ -30,16 +30,19 @@ import org.apache.paimon.disk.ExternalBuffer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.types.DataTypes;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -171,4 +174,32 @@ public class AppendOnlyFileStoreWriteTest {
         writer.complete();
         return binaryRow;
     }
+
+    @Test
+    public void testScanFilterWithNullPartition() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) 
table.store().newWrite("ss");
+        StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
+        write.withExecutionMode(false);
+
+        for (int i = 0; i < 100; i++) {
+            write.write(nullPartition(), i, GenericRow.of(null, i, i));
+            commit.commit(i, write.prepareCommit(false, i));
+        }
+
+        BinaryRow binaryRow = nullPartition();
+        FileStoreScan scan = table.store().newScan();
+        List<SimpleFileEntry> l0 =
+                
scan.withPartitionFilter(Collections.singletonList(binaryRow)).readSimpleEntries();
+        Assertions.assertThat(l0.size()).isEqualTo(100);
+    }
+
+    private BinaryRow nullPartition() {
+        BinaryRow binaryRow = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+        writer.setNullAt(0);
+        writer.complete();
+        return binaryRow;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 6a31c432f..3f81fdf9a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -258,9 +258,8 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         List<Predicate> predicates = new ArrayList<>();
         for (int i = 0; i < partitionKeys.size(); i++) {
             int index = fieldNames.indexOf(partitionKeys.get(i));
-            predicates.add(
-                    builder.equal(
-                            index, InternalRowUtils.get(partition, i, 
rowType.getTypeAt(index))));
+            Object value = InternalRowUtils.get(partition, i, 
rowType.getTypeAt(index));
+            predicates.add(value == null ? builder.isNull(index) : 
builder.equal(index, value));
         }
         return PredicateBuilder.and(predicates);
     }

Reply via email to