This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 73a172fffd [core] Refactor format scan optimize to simplify predicates
(#7132)
73a172fffd is described below
commit 73a172fffdc632f27a287940357d70f3eaae08a3
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jan 27 19:09:59 2026 +0800
[core] Refactor format scan optimize to simplify predicates (#7132)
---
.../org/apache/paimon/predicate/LeafPredicate.java | 6 +-
.../paimon/predicate/NullableLeafPredicate.java | 71 ----------------
.../paimon/table/format/FormatTableScan.java | 12 +--
.../table/format/predicate/PredicateUtils.java | 96 ++++++++++++++++++++++
.../apache/paimon/utils/PartitionPathUtils.java | 45 +++++-----
.../table/format/predicate/PredicateUtilsTest.java | 77 +++++++++++++++++
6 files changed, 201 insertions(+), 106 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 13ec8c227e..5e5b386311 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -56,12 +56,12 @@ public class LeafPredicate implements Predicate {
public static final String FIELD_LITERALS = "literals";
@JsonProperty(FIELD_TRANSFORM)
- protected final Transform transform;
+ private final Transform transform;
@JsonProperty(FIELD_FUNCTION)
- protected final LeafFunction function;
+ private final LeafFunction function;
- protected transient List<Object> literals;
+ private transient List<Object> literals;
public LeafPredicate(
LeafFunction function,
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/NullableLeafPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/NullableLeafPredicate.java
deleted file mode 100644
index a721804bd4..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/NullableLeafPredicate.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.predicate;
-
-import org.apache.paimon.data.InternalRow;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** A {@link LeafPredicate} that handles nullable values. */
-public class NullableLeafPredicate extends LeafPredicate {
-
- public NullableLeafPredicate(
- Transform transform, LeafFunction function, List<Object> literals)
{
- super(transform, function, literals);
- }
-
- @Override
- public boolean test(InternalRow row) {
- Object value = transform.transform(row);
- if (value == null) {
- return true;
- }
- return function.test(transform.outputType(), value, literals);
- }
-
- public static NullableLeafPredicate fromLeafPredicate(LeafPredicate
leafPredicate) {
- return new NullableLeafPredicate(
- leafPredicate.transform(), leafPredicate.function(),
leafPredicate.literals());
- }
-
- public static Predicate from(Predicate predicate) {
- return predicate.visit(Visitor.INSTANCE);
- }
-
- private static class Visitor implements PredicateVisitor<Predicate> {
-
- private static final Visitor INSTANCE = new Visitor();
-
- @Override
- public Predicate visit(LeafPredicate predicate) {
- return fromLeafPredicate(predicate);
- }
-
- @Override
- public Predicate visit(CompoundPredicate predicate) {
- CompoundPredicate.Function function = predicate.function();
- List<Predicate> newChildren =
- predicate.children().stream()
- .map(child -> child.visit(this))
- .collect(Collectors.toList());
- return new CompoundPredicate(function, newChildren);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index 5d7b574c89..9a561a6bd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -36,10 +36,10 @@ import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.LeafFunction;
import org.apache.paimon.predicate.LeafPredicate;
-import org.apache.paimon.predicate.NullableLeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.format.predicate.PredicateUtils;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
@@ -179,11 +179,11 @@ public class FormatTableScan implements InnerTableScan {
// search paths with partition filter optimization
// This will prune partition directories early during traversal,
// which is especially important for cloud storage like OSS/S3
- Predicate predicate = null;
+ Map<String, Predicate> partitionPredicates = new HashMap<>();
if (partitionFilter instanceof DefaultPartitionPredicate) {
- predicate = ((DefaultPartitionPredicate)
partitionFilter).predicate();
-
- predicate = NullableLeafPredicate.from(predicate);
+ Predicate predicate = ((DefaultPartitionPredicate)
partitionFilter).predicate();
+ partitionPredicates =
+
PredicateUtils.splitPartitionPredicate(table.partitionType(), predicate);
}
Pair<Path, Integer> scanPathAndLevel =
@@ -199,7 +199,7 @@ public class FormatTableScan implements InnerTableScan {
scanPathAndLevel.getRight(),
table.partitionKeys(),
onlyValueInPath,
- predicate,
+ partitionPredicates,
table.partitionType(),
table.defaultPartName());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
new file mode 100644
index 0000000000..93ad6b9010
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format.predicate;
+
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.types.RowType;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
+
+/** Utility methods for working with {@link Predicate}s. */
+public class PredicateUtils {
+
+ // Use splitAnd split the predicate, then group them by the partition
fields.
+ public static Map<String, Predicate> splitPartitionPredicate(
+ RowType partitionType, Predicate predicate) {
+ int[] fieldMap = new int[partitionType.getFields().size()];
+ Arrays.fill(fieldMap, 0);
+ List<Predicate> predicates = PredicateBuilder.splitAnd(predicate);
+ Set<String> partitionFieldNames = new
HashSet<>(partitionType.getFieldNames());
+ Map<String, Predicate> result = new HashMap<>();
+
+ for (Predicate sub : predicates) {
+ // Collect all field names referenced by this predicate
+ Set<String> referencedFields = sub.visit(new FieldNameCollector());
+ Optional<Predicate> transformed = transformFieldMapping(sub,
fieldMap);
+ if (transformed.isPresent() && referencedFields.size() == 1) {
+ Predicate child = transformed.get();
+ // Only include predicates that reference exactly one
partition field
+ String fieldName = referencedFields.iterator().next();
+ if (partitionFieldNames.contains(fieldName)) {
+ if (result.containsKey(fieldName)) {
+ // Combine with existing predicate using AND
+ result.put(fieldName,
PredicateBuilder.and(result.get(fieldName), child));
+ } else {
+ result.put(fieldName, child);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+ /** A visitor that collects all field names referenced by a predicate. */
+ private static class FieldNameCollector implements
PredicateVisitor<Set<String>> {
+
+ @Override
+ public Set<String> visit(LeafPredicate predicate) {
+ Set<String> fieldNames = new HashSet<>();
+ for (Object input : predicate.transform().inputs()) {
+ if (input instanceof FieldRef) {
+ fieldNames.add(((FieldRef) input).name());
+ }
+ }
+ return fieldNames;
+ }
+
+ @Override
+ public Set<String> visit(CompoundPredicate predicate) {
+ Set<String> fieldNames = new HashSet<>();
+ for (Predicate child : predicate.children()) {
+ fieldNames.addAll(child.visit(this));
+ }
+ return fieldNames;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
index a76af18331..c20abf7f34 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -279,7 +280,14 @@ public class PartitionPathUtils {
List<String> partitionKeys,
boolean onlyValueInPath) {
return searchPartSpecAndPaths(
- fileIO, path, partitionNumber, partitionKeys, onlyValueInPath,
null, null, null);
+ fileIO,
+ path,
+ partitionNumber,
+ partitionKeys,
+ onlyValueInPath,
+ Collections.emptyMap(),
+ null,
+ null);
}
public static List<Pair<LinkedHashMap<String, String>, Path>>
searchPartSpecAndPaths(
@@ -288,7 +296,7 @@ public class PartitionPathUtils {
int partitionNumber,
List<String> partitionKeys,
boolean onlyValueInPath,
- @Nullable Predicate partitionFilter,
+ Map<String, Predicate> partitionFilter,
@Nullable RowType partitionType,
@Nullable String defaultPartValue) {
FileStatus[] generatedParts =
@@ -331,7 +339,7 @@ public class PartitionPathUtils {
FileIO fileIO,
List<String> partitionKeys,
boolean onlyValueInPath,
- @Nullable Predicate partitionFilter,
+ Map<String, Predicate> partitionFilter,
@Nullable RowType partitionType,
@Nullable String defaultPartValue) {
ArrayList<FileStatus> result = new ArrayList<>();
@@ -340,9 +348,6 @@ public class PartitionPathUtils {
if (fileIO.exists(path)) {
// ignore hidden file
FileStatus fileStatus = fileIO.getFileStatus(path);
- // Create an array to hold accumulated partition values at
each level
- Object[] partitionValues =
- partitionFilter != null ? new
Object[partitionKeys.size()] : null;
// Calculate the starting offset when we begin from a prefix
path
// For example, if partitionKeys = [ds, hr] and expectLevel =
1 (only hr remaining),
// then levelOffset = 2 - 1 = 1, so we access partitionKeys[1]
for level 0
@@ -358,7 +363,6 @@ public class PartitionPathUtils {
partitionFilter,
partitionType,
defaultPartValue,
- partitionValues,
levelOffset);
} else {
return new FileStatus[0];
@@ -378,10 +382,9 @@ public class PartitionPathUtils {
List<FileStatus> results,
List<String> partitionKeys,
boolean onlyValueInPath,
- @Nullable Predicate partitionFilter,
+ Map<String, Predicate> partitionFilter,
@Nullable RowType partitionType,
@Nullable String defaultPartValue,
- @Nullable Object[] partitionValues,
int levelOffset)
throws IOException {
if (isHiddenFile(fileStatus.getPath())) {
@@ -401,10 +404,11 @@ public class PartitionPathUtils {
int partitionKeyIndex = levelOffset + level;
// Apply partition filter if available
- if (partitionFilter != null
- && partitionType != null
- && partitionValues != null
- && partitionKeyIndex < partitionKeys.size()) {
+ if
(partitionFilter.containsKey(partitionKeys.get(partitionKeyIndex))
+ && partitionType != null) {
+
+ Predicate partitionPredicate =
+
partitionFilter.get(partitionKeys.get(partitionKeyIndex));
// Extract the partition value from the directory name
String dirName = stat.getPath().getName();
String partitionKey = partitionKeys.get(partitionKeyIndex);
@@ -438,17 +442,8 @@ public class PartitionPathUtils {
partitionValue,
partitionType.getTypeAt(partitionKeyIndex));
- // Create a copy of partition values and set the
current partition key value
- Object[] currentPartitionValues =
partitionValues.clone();
- currentPartitionValues[partitionKeyIndex] =
internalValue;
-
- // Build a partial row with the accumulated partition
values
- GenericRow partialRow = new
GenericRow(partitionKeys.size());
- for (int i = 0; i <= partitionKeyIndex; i++) {
- partialRow.setField(i, currentPartitionValues[i]);
- }
-
- if (!partitionFilter.test(partialRow)) {
+ GenericRow partialRow = GenericRow.of(internalValue);
+ if (!partitionPredicate.test(partialRow)) {
continue;
}
@@ -464,7 +459,6 @@ public class PartitionPathUtils {
partitionFilter,
partitionType,
defaultPartValue,
- currentPartitionValues,
levelOffset);
continue;
}
@@ -481,7 +475,6 @@ public class PartitionPathUtils {
partitionFilter,
partitionType,
defaultPartValue,
- partitionValues,
levelOffset);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java
new file mode 100644
index 0000000000..2713dae75b
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format.predicate;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PredicateUtils}. */
+public class PredicateUtilsTest {
+
+ private final RowType partitionType =
+ RowType.builder().field("dt", DataTypes.INT()).field("hr",
DataTypes.INT()).build();
+
+ private final RowType fullRowType =
+ RowType.builder()
+ .field("dt", DataTypes.INT())
+ .field("hr", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .field("value", DataTypes.INT())
+ .build();
+
+ @Test
+ public void testMultiplePredicatesOnSamePartitionField() {
+ PredicateBuilder builder = new PredicateBuilder(fullRowType);
+ // hr > 10 AND hr < 20
+ Predicate hrGreater = builder.greaterThan(1, 10);
+ Predicate hrLess = builder.lessThan(1, 20);
+ Predicate dtGreater = builder.greaterThan(0, 0);
+ Predicate dtLess = builder.lessThan(0, 10);
+
+ Predicate predicate = PredicateBuilder.and(hrGreater, hrLess,
dtGreater, dtLess);
+
+ Map<String, Predicate> result =
+ PredicateUtils.splitPartitionPredicate(partitionType,
predicate);
+
+ assertThat(result).hasSize(2);
+ assertThat(result).containsKey("hr");
+ assertThat(result).containsKey("dt");
+ // Should combine with AND
+ Predicate combined = result.get("hr");
+ // Test that the combined predicate works correctly
+ assertThat(combined.test(GenericRow.of(15))).isTrue();
+ assertThat(combined.test(GenericRow.of(5))).isFalse();
+ assertThat(combined.test(GenericRow.of(25))).isFalse();
+
+ combined = result.get("dt");
+ // Test that the combined predicate works correctly
+ assertThat(combined.test(GenericRow.of(5))).isTrue();
+ assertThat(combined.test(GenericRow.of(11))).isFalse();
+ assertThat(combined.test(GenericRow.of(-1))).isFalse();
+ }
+}