This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a172fffd [core] Refactor format scan optimize to simplify predicates 
(#7132)
73a172fffd is described below

commit 73a172fffdc632f27a287940357d70f3eaae08a3
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jan 27 19:09:59 2026 +0800

    [core] Refactor format scan optimize to simplify predicates (#7132)
---
 .../org/apache/paimon/predicate/LeafPredicate.java |  6 +-
 .../paimon/predicate/NullableLeafPredicate.java    | 71 ----------------
 .../paimon/table/format/FormatTableScan.java       | 12 +--
 .../table/format/predicate/PredicateUtils.java     | 96 ++++++++++++++++++++++
 .../apache/paimon/utils/PartitionPathUtils.java    | 45 +++++-----
 .../table/format/predicate/PredicateUtilsTest.java | 77 +++++++++++++++++
 6 files changed, 201 insertions(+), 106 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 13ec8c227e..5e5b386311 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -56,12 +56,12 @@ public class LeafPredicate implements Predicate {
     public static final String FIELD_LITERALS = "literals";
 
     @JsonProperty(FIELD_TRANSFORM)
-    protected final Transform transform;
+    private final Transform transform;
 
     @JsonProperty(FIELD_FUNCTION)
-    protected final LeafFunction function;
+    private final LeafFunction function;
 
-    protected transient List<Object> literals;
+    private transient List<Object> literals;
 
     public LeafPredicate(
             LeafFunction function,
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/NullableLeafPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/NullableLeafPredicate.java
deleted file mode 100644
index a721804bd4..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/NullableLeafPredicate.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.predicate;
-
-import org.apache.paimon.data.InternalRow;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** A {@link LeafPredicate} that handles nullable values. */
-public class NullableLeafPredicate extends LeafPredicate {
-
-    public NullableLeafPredicate(
-            Transform transform, LeafFunction function, List<Object> literals) 
{
-        super(transform, function, literals);
-    }
-
-    @Override
-    public boolean test(InternalRow row) {
-        Object value = transform.transform(row);
-        if (value == null) {
-            return true;
-        }
-        return function.test(transform.outputType(), value, literals);
-    }
-
-    public static NullableLeafPredicate fromLeafPredicate(LeafPredicate 
leafPredicate) {
-        return new NullableLeafPredicate(
-                leafPredicate.transform(), leafPredicate.function(), 
leafPredicate.literals());
-    }
-
-    public static Predicate from(Predicate predicate) {
-        return predicate.visit(Visitor.INSTANCE);
-    }
-
-    private static class Visitor implements PredicateVisitor<Predicate> {
-
-        private static final Visitor INSTANCE = new Visitor();
-
-        @Override
-        public Predicate visit(LeafPredicate predicate) {
-            return fromLeafPredicate(predicate);
-        }
-
-        @Override
-        public Predicate visit(CompoundPredicate predicate) {
-            CompoundPredicate.Function function = predicate.function();
-            List<Predicate> newChildren =
-                    predicate.children().stream()
-                            .map(child -> child.visit(this))
-                            .collect(Collectors.toList());
-            return new CompoundPredicate(function, newChildren);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index 5d7b574c89..9a561a6bd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -36,10 +36,10 @@ import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.LeafFunction;
 import org.apache.paimon.predicate.LeafPredicate;
-import org.apache.paimon.predicate.NullableLeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.format.predicate.PredicateUtils;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableScan;
@@ -179,11 +179,11 @@ public class FormatTableScan implements InnerTableScan {
             // search paths with partition filter optimization
             // This will prune partition directories early during traversal,
             // which is especially important for cloud storage like OSS/S3
-            Predicate predicate = null;
+            Map<String, Predicate> partitionPredicates = new HashMap<>();
             if (partitionFilter instanceof DefaultPartitionPredicate) {
-                predicate = ((DefaultPartitionPredicate) 
partitionFilter).predicate();
-
-                predicate = NullableLeafPredicate.from(predicate);
+                Predicate predicate = ((DefaultPartitionPredicate) 
partitionFilter).predicate();
+                partitionPredicates =
+                        
PredicateUtils.splitPartitionPredicate(table.partitionType(), predicate);
             }
 
             Pair<Path, Integer> scanPathAndLevel =
@@ -199,7 +199,7 @@ public class FormatTableScan implements InnerTableScan {
                     scanPathAndLevel.getRight(),
                     table.partitionKeys(),
                     onlyValueInPath,
-                    predicate,
+                    partitionPredicates,
                     table.partitionType(),
                     table.defaultPartName());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
new file mode 100644
index 0000000000..93ad6b9010
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format.predicate;
+
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.types.RowType;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
+
+/** Utility methods for working with {@link Predicate}s. */
+public class PredicateUtils {
+
+    // Use splitAnd split the predicate, then group them by the partition 
fields.
+    public static Map<String, Predicate> splitPartitionPredicate(
+            RowType partitionType, Predicate predicate) {
+        int[] fieldMap = new int[partitionType.getFields().size()];
+        Arrays.fill(fieldMap, 0);
+        List<Predicate> predicates = PredicateBuilder.splitAnd(predicate);
+        Set<String> partitionFieldNames = new 
HashSet<>(partitionType.getFieldNames());
+        Map<String, Predicate> result = new HashMap<>();
+
+        for (Predicate sub : predicates) {
+            // Collect all field names referenced by this predicate
+            Set<String> referencedFields = sub.visit(new FieldNameCollector());
+            Optional<Predicate> transformed = transformFieldMapping(sub, 
fieldMap);
+            if (transformed.isPresent() && referencedFields.size() == 1) {
+                Predicate child = transformed.get();
+                // Only include predicates that reference exactly one 
partition field
+                String fieldName = referencedFields.iterator().next();
+                if (partitionFieldNames.contains(fieldName)) {
+                    if (result.containsKey(fieldName)) {
+                        // Combine with existing predicate using AND
+                        result.put(fieldName, 
PredicateBuilder.and(result.get(fieldName), child));
+                    } else {
+                        result.put(fieldName, child);
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
+    /** A visitor that collects all field names referenced by a predicate. */
+    private static class FieldNameCollector implements 
PredicateVisitor<Set<String>> {
+
+        @Override
+        public Set<String> visit(LeafPredicate predicate) {
+            Set<String> fieldNames = new HashSet<>();
+            for (Object input : predicate.transform().inputs()) {
+                if (input instanceof FieldRef) {
+                    fieldNames.add(((FieldRef) input).name());
+                }
+            }
+            return fieldNames;
+        }
+
+        @Override
+        public Set<String> visit(CompoundPredicate predicate) {
+            Set<String> fieldNames = new HashSet<>();
+            for (Predicate child : predicate.children()) {
+                fieldNames.addAll(child.visit(this));
+            }
+            return fieldNames;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index a76af18331..c20abf7f34 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -279,7 +280,14 @@ public class PartitionPathUtils {
             List<String> partitionKeys,
             boolean onlyValueInPath) {
         return searchPartSpecAndPaths(
-                fileIO, path, partitionNumber, partitionKeys, onlyValueInPath, 
null, null, null);
+                fileIO,
+                path,
+                partitionNumber,
+                partitionKeys,
+                onlyValueInPath,
+                Collections.emptyMap(),
+                null,
+                null);
     }
 
     public static List<Pair<LinkedHashMap<String, String>, Path>> 
searchPartSpecAndPaths(
@@ -288,7 +296,7 @@ public class PartitionPathUtils {
             int partitionNumber,
             List<String> partitionKeys,
             boolean onlyValueInPath,
-            @Nullable Predicate partitionFilter,
+            Map<String, Predicate> partitionFilter,
             @Nullable RowType partitionType,
             @Nullable String defaultPartValue) {
         FileStatus[] generatedParts =
@@ -331,7 +339,7 @@ public class PartitionPathUtils {
             FileIO fileIO,
             List<String> partitionKeys,
             boolean onlyValueInPath,
-            @Nullable Predicate partitionFilter,
+            Map<String, Predicate> partitionFilter,
             @Nullable RowType partitionType,
             @Nullable String defaultPartValue) {
         ArrayList<FileStatus> result = new ArrayList<>();
@@ -340,9 +348,6 @@ public class PartitionPathUtils {
             if (fileIO.exists(path)) {
                 // ignore hidden file
                 FileStatus fileStatus = fileIO.getFileStatus(path);
-                // Create an array to hold accumulated partition values at 
each level
-                Object[] partitionValues =
-                        partitionFilter != null ? new 
Object[partitionKeys.size()] : null;
                 // Calculate the starting offset when we begin from a prefix 
path
                 // For example, if partitionKeys = [ds, hr] and expectLevel = 
1 (only hr remaining),
                 // then levelOffset = 2 - 1 = 1, so we access partitionKeys[1] 
for level 0
@@ -358,7 +363,6 @@ public class PartitionPathUtils {
                         partitionFilter,
                         partitionType,
                         defaultPartValue,
-                        partitionValues,
                         levelOffset);
             } else {
                 return new FileStatus[0];
@@ -378,10 +382,9 @@ public class PartitionPathUtils {
             List<FileStatus> results,
             List<String> partitionKeys,
             boolean onlyValueInPath,
-            @Nullable Predicate partitionFilter,
+            Map<String, Predicate> partitionFilter,
             @Nullable RowType partitionType,
             @Nullable String defaultPartValue,
-            @Nullable Object[] partitionValues,
             int levelOffset)
             throws IOException {
         if (isHiddenFile(fileStatus.getPath())) {
@@ -401,10 +404,11 @@ public class PartitionPathUtils {
                 int partitionKeyIndex = levelOffset + level;
 
                 // Apply partition filter if available
-                if (partitionFilter != null
-                        && partitionType != null
-                        && partitionValues != null
-                        && partitionKeyIndex < partitionKeys.size()) {
+                if 
(partitionFilter.containsKey(partitionKeys.get(partitionKeyIndex))
+                        && partitionType != null) {
+
+                    Predicate partitionPredicate =
+                            
partitionFilter.get(partitionKeys.get(partitionKeyIndex));
                     // Extract the partition value from the directory name
                     String dirName = stat.getPath().getName();
                     String partitionKey = partitionKeys.get(partitionKeyIndex);
@@ -438,17 +442,8 @@ public class PartitionPathUtils {
                                                 partitionValue,
                                                 
partitionType.getTypeAt(partitionKeyIndex));
 
-                        // Create a copy of partition values and set the 
current partition key value
-                        Object[] currentPartitionValues = 
partitionValues.clone();
-                        currentPartitionValues[partitionKeyIndex] = 
internalValue;
-
-                        // Build a partial row with the accumulated partition 
values
-                        GenericRow partialRow = new 
GenericRow(partitionKeys.size());
-                        for (int i = 0; i <= partitionKeyIndex; i++) {
-                            partialRow.setField(i, currentPartitionValues[i]);
-                        }
-
-                        if (!partitionFilter.test(partialRow)) {
+                        GenericRow partialRow = GenericRow.of(internalValue);
+                        if (!partitionPredicate.test(partialRow)) {
                             continue;
                         }
 
@@ -464,7 +459,6 @@ public class PartitionPathUtils {
                                 partitionFilter,
                                 partitionType,
                                 defaultPartValue,
-                                currentPartitionValues,
                                 levelOffset);
                         continue;
                     }
@@ -481,7 +475,6 @@ public class PartitionPathUtils {
                         partitionFilter,
                         partitionType,
                         defaultPartValue,
-                        partitionValues,
                         levelOffset);
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java
new file mode 100644
index 0000000000..2713dae75b
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format.predicate;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PredicateUtils}. */
+public class PredicateUtilsTest {
+
+    private final RowType partitionType =
+            RowType.builder().field("dt", DataTypes.INT()).field("hr", 
DataTypes.INT()).build();
+
+    private final RowType fullRowType =
+            RowType.builder()
+                    .field("dt", DataTypes.INT())
+                    .field("hr", DataTypes.INT())
+                    .field("name", DataTypes.STRING())
+                    .field("value", DataTypes.INT())
+                    .build();
+
+    @Test
+    public void testMultiplePredicatesOnSamePartitionField() {
+        PredicateBuilder builder = new PredicateBuilder(fullRowType);
+        // hr > 10 AND hr < 20
+        Predicate hrGreater = builder.greaterThan(1, 10);
+        Predicate hrLess = builder.lessThan(1, 20);
+        Predicate dtGreater = builder.greaterThan(0, 0);
+        Predicate dtLess = builder.lessThan(0, 10);
+
+        Predicate predicate = PredicateBuilder.and(hrGreater, hrLess, 
dtGreater, dtLess);
+
+        Map<String, Predicate> result =
+                PredicateUtils.splitPartitionPredicate(partitionType, 
predicate);
+
+        assertThat(result).hasSize(2);
+        assertThat(result).containsKey("hr");
+        assertThat(result).containsKey("dt");
+        // Should combine with AND
+        Predicate combined = result.get("hr");
+        // Test that the combined predicate works correctly
+        assertThat(combined.test(GenericRow.of(15))).isTrue();
+        assertThat(combined.test(GenericRow.of(5))).isFalse();
+        assertThat(combined.test(GenericRow.of(25))).isFalse();
+
+        combined = result.get("dt");
+        // Test that the combined predicate works correctly
+        assertThat(combined.test(GenericRow.of(5))).isTrue();
+        assertThat(combined.test(GenericRow.of(11))).isFalse();
+        assertThat(combined.test(GenericRow.of(-1))).isFalse();
+    }
+}

Reply via email to