This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d30da0116b [spark] Support AlwaysTrue and AlwaysFalse predicates in 
SparkV2FilterConverter (#7224)
d30da0116b is described below

commit d30da0116bbc1a2263973006f8cc06247db845d2
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Feb 26 12:28:25 2026 +0800

    [spark] Support AlwaysTrue and AlwaysFalse predicates in 
SparkV2FilterConverter (#7224)
    
    `DELETE FROM t1 WHERE pt IN (SELECT id FROM t2 WHERE n > 10) OR pt = 3`
    The subquery (n > 10) returns empty, evalSubquery replaces it with 
Literal(false). After translateFilterV2, this becomes: V2Or(ALWAYS_FALSE, 
V2EqualTo(pt, 3)). Without supporting ALWAYS_FALSE in SparkV2FilterConverter, 
the OR conversion would fail, falling back to the expensive row-level delete 
instead of partition dropping.
---
 .../org/apache/paimon/predicate/FalseFunction.java |  71 ++++++++
 .../org/apache/paimon/predicate/LeafFunction.java  |   2 +
 .../org/apache/paimon/predicate/LeafPredicate.java |   6 +
 .../predicate/PartitionPredicateVisitor.java       |   4 +-
 .../org/apache/paimon/predicate/Transform.java     |   3 +-
 .../org/apache/paimon/predicate/TrueFunction.java  |  71 ++++++++
 .../org/apache/paimon/predicate/TrueTransform.java |  84 ++++++++++
 .../apache/paimon/predicate/LeafPredicateTest.java |  96 +++++++++++
 .../paimon/predicate/PredicateJsonSerdeTest.java   |  18 +++
 .../paimon/spark/SparkV2FilterConverter.scala      |  17 +-
 .../spark/sql/SparkV2FilterConverterTestBase.scala | 179 ++++++++++++++++++++-
 11 files changed, 544 insertions(+), 7 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java
new file mode 100644
index 0000000000..36b9300961
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.types.DataType;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link LeafFunction} that always returns {@code false}. Used for 
AlwaysFalse predicates. */
+public class FalseFunction extends LeafFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String NAME = "FALSE";
+
+    public static final FalseFunction INSTANCE = new FalseFunction();
+
+    @JsonCreator
+    private FalseFunction() {}
+
+    @Override
+    public boolean test(DataType type, Object field, List<Object> literals) {
+        return false;
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals) {
+        return false;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(TrueFunction.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        throw new UnsupportedOperationException(
+                "FalseFunction does not support field-based visitation.");
+    }
+
+    @Override
+    public String toJson() {
+        return NAME;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
index 5d8edc1d67..0f45e8e920 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
@@ -68,6 +68,8 @@ public abstract class LeafFunction implements Serializable {
             registry.put(NotIn.NAME, NotIn.INSTANCE);
             registry.put(Between.NAME, Between.INSTANCE);
             registry.put(NotBetween.NAME, NotBetween.INSTANCE);
+            registry.put(TrueFunction.NAME, TrueFunction.INSTANCE);
+            registry.put(FalseFunction.NAME, FalseFunction.INSTANCE);
             return Collections.unmodifiableMap(registry);
         }
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 5e5b386311..e10b5deb5e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -143,6 +143,9 @@ public class LeafPredicate implements Predicate {
             long rowCount, InternalRow minValues, InternalRow maxValues, 
InternalArray nullCounts) {
         Optional<FieldRef> fieldRefOptional = fieldRefOptional();
         if (!fieldRefOptional.isPresent()) {
+            if (transform instanceof TrueTransform) {
+                return function.test(transform.outputType(), 0, null, null, 
null, literals);
+            }
             return true;
         }
         FieldRef fieldRef = fieldRefOptional.get();
@@ -165,6 +168,9 @@ public class LeafPredicate implements Predicate {
 
     @Override
     public Optional<Predicate> negate() {
+        if (transform instanceof TrueTransform) {
+            return function.negate().map(neg -> new LeafPredicate(transform, 
neg, literals));
+        }
         Optional<FieldRef> fieldRefOptional = fieldRefOptional();
         if (!fieldRefOptional.isPresent()) {
             return Optional.empty();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
index 4db27bd1a1..94f92102e0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
@@ -32,14 +32,16 @@ public class PartitionPredicateVisitor implements 
PredicateVisitor<Boolean> {
     @Override
     public Boolean visit(LeafPredicate predicate) {
         Transform transform = predicate.transform();
+        boolean hasFieldRef = false;
         for (Object input : transform.inputs()) {
             if (input instanceof FieldRef) {
+                hasFieldRef = true;
                 if (!partitionKeys.contains(((FieldRef) input).name())) {
                     return false;
                 }
             }
         }
-        return true;
+        return hasFieldRef;
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
index 199e1df170..6d7b9ba699 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
@@ -38,7 +38,8 @@ import java.util.List;
     @JsonSubTypes.Type(value = ConcatTransform.class, name = 
ConcatTransform.NAME),
     @JsonSubTypes.Type(value = ConcatWsTransform.class, name = 
ConcatWsTransform.NAME),
     @JsonSubTypes.Type(value = UpperTransform.class, name = 
UpperTransform.NAME),
-    @JsonSubTypes.Type(value = LowerTransform.class, name = 
LowerTransform.NAME)
+    @JsonSubTypes.Type(value = LowerTransform.class, name = 
LowerTransform.NAME),
+    @JsonSubTypes.Type(value = TrueTransform.class, name = TrueTransform.NAME)
 })
 public interface Transform extends Serializable {
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java
new file mode 100644
index 0000000000..6ca9d057f0
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.types.DataType;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link LeafFunction} that always returns {@code true}. Used for 
AlwaysTrue predicates. */
+public class TrueFunction extends LeafFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String NAME = "TRUE";
+
+    public static final TrueFunction INSTANCE = new TrueFunction();
+
+    @JsonCreator
+    private TrueFunction() {}
+
+    @Override
+    public boolean test(DataType type, Object field, List<Object> literals) {
+        return true;
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals) {
+        return true;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(FalseFunction.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        throw new UnsupportedOperationException(
+                "TrueFunction does not support field-based visitation.");
+    }
+
+    @Override
+    public String toJson() {
+        return NAME;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java
new file mode 100644
index 0000000000..2017d2e8d2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.Collections;
+import java.util.List;
+
+/** A {@link Transform} that always returns {@code true}. Used for constant 
predicates. */
+public class TrueTransform implements Transform {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String NAME = "TRUE";
+
+    public static final TrueTransform INSTANCE = new TrueTransform();
+
+    @JsonCreator
+    private TrueTransform() {}
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public List<Object> inputs() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public DataType outputType() {
+        return DataTypes.BOOLEAN();
+    }
+
+    @Override
+    public Object transform(InternalRow row) {
+        return true;
+    }
+
+    @Override
+    public Transform copyWithNewInputs(List<Object> inputs) {
+        return INSTANCE;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode() {
+        return NAME.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return NAME;
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
index 1368611554..0af7cf3806 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.predicate;
 
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.InstantiationUtil;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -78,4 +80,98 @@ class LeafPredicateTest {
         literals.add(BinaryString.fromString("ha-he"));
         return LeafPredicate.of(transform, Equal.INSTANCE, literals);
     }
+
+    @Test
+    public void testAlwaysTrueRow() {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, TrueFunction.INSTANCE, 
Collections.emptyList());
+        assertThat(predicate.test(GenericRow.of(1))).isTrue();
+        assertThat(predicate.test(GenericRow.of((Object) null))).isTrue();
+    }
+
+    @Test
+    public void testAlwaysFalseRow() {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, FalseFunction.INSTANCE, 
Collections.emptyList());
+        assertThat(predicate.test(GenericRow.of(1))).isFalse();
+        assertThat(predicate.test(GenericRow.of((Object) null))).isFalse();
+    }
+
+    @Test
+    public void testAlwaysTrueMinMax() {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, TrueFunction.INSTANCE, 
Collections.emptyList());
+        assertThat(
+                        predicate.test(
+                                10,
+                                GenericRow.of(1),
+                                GenericRow.of(10),
+                                new GenericArray(new long[] {0})))
+                .isTrue();
+        assertThat(predicate.test(1, null, null, null)).isTrue();
+    }
+
+    @Test
+    public void testAlwaysFalseMinMax() {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, FalseFunction.INSTANCE, 
Collections.emptyList());
+        assertThat(
+                        predicate.test(
+                                10,
+                                GenericRow.of(1),
+                                GenericRow.of(10),
+                                new GenericArray(new long[] {0})))
+                .isFalse();
+        assertThat(predicate.test(1, null, null, null)).isFalse();
+    }
+
+    @Test
+    public void testAlwaysTrueNegate() {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, TrueFunction.INSTANCE, 
Collections.emptyList());
+        Predicate negated = predicate.negate().get();
+        assertThat(negated).isInstanceOf(LeafPredicate.class);
+        LeafPredicate negatedLeaf = (LeafPredicate) negated;
+        assertThat(negatedLeaf.function()).isEqualTo(FalseFunction.INSTANCE);
+        assertThat(negatedLeaf.transform()).isInstanceOf(TrueTransform.class);
+        assertThat(negatedLeaf.test(GenericRow.of(1))).isFalse();
+    }
+
+    @Test
+    public void testAlwaysFalseNegate() {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, FalseFunction.INSTANCE, 
Collections.emptyList());
+        Predicate negated = predicate.negate().get();
+        assertThat(negated).isInstanceOf(LeafPredicate.class);
+        LeafPredicate negatedLeaf = (LeafPredicate) negated;
+        assertThat(negatedLeaf.function()).isEqualTo(TrueFunction.INSTANCE);
+        assertThat(negatedLeaf.transform()).isInstanceOf(TrueTransform.class);
+        assertThat(negatedLeaf.test(GenericRow.of(1))).isTrue();
+    }
+
+    @Test
+    public void testAlwaysTrueSerialization() throws IOException, 
ClassNotFoundException {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, TrueFunction.INSTANCE, 
Collections.emptyList());
+        LeafPredicate clone = InstantiationUtil.clone(predicate);
+        assertThat(clone).isEqualTo(predicate);
+        assertThat(clone.hashCode()).isEqualTo(predicate.hashCode());
+    }
+
+    @Test
+    public void testAlwaysFalseSerialization() throws IOException, 
ClassNotFoundException {
+        LeafPredicate predicate =
+                LeafPredicate.of(
+                        TrueTransform.INSTANCE, FalseFunction.INSTANCE, 
Collections.emptyList());
+        LeafPredicate clone = InstantiationUtil.clone(predicate);
+        assertThat(clone).isEqualTo(predicate);
+        assertThat(clone.hashCode()).isEqualTo(predicate.hashCode());
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
index 475e889c34..3093b83eff 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
@@ -156,6 +156,24 @@ class PredicateJsonSerdeTest {
                         .expectJson(
                                 
"{\"kind\":\"LEAF\",\"transform\":{\"name\":\"FIELD_REF\",\"fieldRef\":{\"index\":2,\"name\":\"f2\",\"type\":\"STRING\"}},\"function\":\"LIKE\",\"literals\":[\"%a%b%\"]}"),
 
+                // LeafPredicate - TrueTransform + TrueFunction (AlwaysTrue)
+                TestSpec.forPredicate(
+                                LeafPredicate.of(
+                                        TrueTransform.INSTANCE,
+                                        TrueFunction.INSTANCE,
+                                        Collections.emptyList()))
+                        .expectJson(
+                                
"{\"kind\":\"LEAF\",\"transform\":{\"name\":\"TRUE\"},\"function\":\"TRUE\",\"literals\":[]}"),
+
+                // LeafPredicate - TrueTransform + FalseFunction (AlwaysFalse)
+                TestSpec.forPredicate(
+                                LeafPredicate.of(
+                                        TrueTransform.INSTANCE,
+                                        FalseFunction.INSTANCE,
+                                        Collections.emptyList()))
+                        .expectJson(
+                                
"{\"kind\":\"LEAF\",\"transform\":{\"name\":\"TRUE\"},\"function\":\"FALSE\",\"literals\":[]}"),
+
                 // LeafPredicate - In with many values including nulls
                 TestSpec.forPredicate(
                                 builder.in(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index 36e188bde7..970d619792 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder, Transform}
+import org.apache.paimon.predicate.{FalseFunction, LeafPredicate, Predicate, 
PredicateBuilder, Transform, TrueFunction, TrueTransform}
 import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral, 
toPaimonTransform}
 import org.apache.paimon.types.RowType
 
@@ -164,7 +164,18 @@ case class SparkV2FilterConverter(rowType: RowType) 
extends Logging {
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
-      // TODO: AlwaysTrue, AlwaysFalse
+      case ALWAYS_TRUE =>
+        LeafPredicate.of(
+          TrueTransform.INSTANCE,
+          TrueFunction.INSTANCE,
+          java.util.Collections.emptyList())
+
+      case ALWAYS_FALSE =>
+        LeafPredicate.of(
+          TrueTransform.INSTANCE,
+          FalseFunction.INSTANCE,
+          java.util.Collections.emptyList())
+
       case _ => throw new UnsupportedOperationException(s"Convert 
$sparkPredicate is unsupported.")
     }
   }
@@ -228,5 +239,7 @@ object SparkV2FilterConverter extends Logging {
   private val STRING_START_WITH = "STARTS_WITH"
   private val STRING_END_WITH = "ENDS_WITH"
   private val STRING_CONTAINS = "CONTAINS"
+  private val ALWAYS_TRUE = "ALWAYS_TRUE"
+  private val ALWAYS_FALSE = "ALWAYS_FALSE"
 
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
index 2d90777fd7..430f0785eb 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
-import org.apache.paimon.predicate.PredicateBuilder
+import org.apache.paimon.predicate.{FalseFunction, LeafPredicate, 
PredicateBuilder, TrueFunction, TrueTransform}
 import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter}
 import 
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
 import org.apache.paimon.table.source.DataSplit
@@ -29,7 +29,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.PaimonUtils.translateFilterV2
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, 
Predicate => SparkPredicate}
 
 import java.time.{LocalDate, LocalDateTime}
 
@@ -355,7 +355,165 @@ abstract class SparkV2FilterConverterTestBase extends 
PaimonSparkTestBase {
     assert(scanFilesCount(filter) == 4)
   }
 
-  private def v2Filter(str: String, tableName: String = "test_tbl"): Predicate 
= {
+  private def paimonAlwaysTrue: org.apache.paimon.predicate.Predicate =
+    LeafPredicate.of(
+      TrueTransform.INSTANCE,
+      TrueFunction.INSTANCE,
+      java.util.Collections.emptyList())
+
+  private def paimonAlwaysFalse: org.apache.paimon.predicate.Predicate =
+    LeafPredicate.of(
+      TrueTransform.INSTANCE,
+      FalseFunction.INSTANCE,
+      java.util.Collections.emptyList())
+
+  test("V2Filter: AlwaysTrue") {
+    val sparkAlwaysTrue =
+      new SparkPredicate(
+        "ALWAYS_TRUE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val actual = converter.convert(sparkAlwaysTrue).get
+    assert(actual.equals(paimonAlwaysTrue))
+  }
+
+  test("V2Filter: AlwaysFalse") {
+    val sparkAlwaysFalse =
+      new SparkPredicate(
+        "ALWAYS_FALSE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val actual = converter.convert(sparkAlwaysFalse).get
+    assert(actual.equals(paimonAlwaysFalse))
+  }
+
+  test("V2Filter: NOT AlwaysTrue") {
+    val sparkAlwaysTrue =
+      new SparkPredicate(
+        "ALWAYS_TRUE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val sparkNotAlwaysTrue = new Not(sparkAlwaysTrue)
+    val actual = converter.convert(sparkNotAlwaysTrue).get
+    assert(actual.equals(paimonAlwaysFalse))
+  }
+
+  test("V2Filter: NOT AlwaysFalse") {
+    val sparkAlwaysFalse =
+      new SparkPredicate(
+        "ALWAYS_FALSE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val sparkNotAlwaysFalse = new Not(sparkAlwaysFalse)
+    val actual = converter.convert(sparkNotAlwaysFalse).get
+    assert(actual.equals(paimonAlwaysTrue))
+  }
+
+  test("V2Filter: AND with AlwaysTrue") {
+    val sparkAlwaysTrue =
+      new SparkPredicate(
+        "ALWAYS_TRUE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val intGt2 = v2Filter("int_col > 2")
+    val sparkAnd = new And(sparkAlwaysTrue, intGt2)
+    val actual = converter.convert(sparkAnd).get
+    assert(actual.equals(PredicateBuilder.and(paimonAlwaysTrue, 
builder.greaterThan(3, 2))))
+  }
+
+  test("V2Filter: OR with AlwaysFalse") {
+    val sparkAlwaysFalse =
+      new SparkPredicate(
+        "ALWAYS_FALSE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val intGt2 = v2Filter("int_col > 2")
+    val sparkOr = new Or(sparkAlwaysFalse, intGt2)
+    val actual = converter.convert(sparkOr).get
+    assert(actual.equals(PredicateBuilder.or(paimonAlwaysFalse, 
builder.greaterThan(3, 2))))
+  }
+
+  test("V2Filter: performance - OR(AlwaysFalse, int_col = 1) enables file 
skipping") {
+    // Before: OR(ALWAYS_FALSE, int_col = 1) conversion fails entirely -> no 
pushdown -> 4 files
+    // After:  OR(ALWAYS_FALSE, int_col = 1) converts correctly -> pushdown -> 
1 file
+    val sparkAlwaysFalse =
+      new SparkPredicate(
+        "ALWAYS_FALSE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val intEq1 = v2Filter("int_col = 1")
+    val sparkOr = new Or(sparkAlwaysFalse, intEq1)
+    val paimonPredicate = converter.convert(sparkOr).get
+
+    val filesScanned = scanFilesWithPredicate(paimonPredicate)
+    // Only 1 file should be scanned (the one containing int_col = 1)
+    assert(filesScanned == 1, s"Expected 1 file but scanned $filesScanned 
files")
+  }
+
+  test("V2Filter: performance - AND(AlwaysTrue, int_col > 2) enables file 
skipping") {
+    // Before: AND(ALWAYS_TRUE, int_col > 2) conversion fails entirely -> no 
pushdown -> 4 files
+    // After:  AND(ALWAYS_TRUE, int_col > 2) converts correctly -> pushdown -> 
1 file
+    val sparkAlwaysTrue =
+      new SparkPredicate(
+        "ALWAYS_TRUE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val intGt2 = v2Filter("int_col > 2")
+    val sparkAnd = new And(sparkAlwaysTrue, intGt2)
+    val paimonPredicate = converter.convert(sparkAnd).get
+
+    val filesScanned = scanFilesWithPredicate(paimonPredicate)
+    // Only 1 file should be scanned (the one containing int_col = 3)
+    assert(filesScanned == 1, s"Expected 1 file but scanned $filesScanned 
files")
+  }
+
+  test("V2Filter: performance - pure AlwaysFalse achieves zero I/O") {
+    // AlwaysFalse alone should skip all data files -> 0 files scanned
+    val sparkAlwaysFalse =
+      new SparkPredicate(
+        "ALWAYS_FALSE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val paimonPredicate = converter.convert(sparkAlwaysFalse).get
+
+    val filesScanned = scanFilesWithPredicate(paimonPredicate)
+    assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned 
files")
+  }
+
+  test("V2Filter: performance - AND(AlwaysFalse, int_col = 1) achieves zero 
I/O") {
+    // AND with AlwaysFalse should result in zero files no matter what the 
other predicate is
+    val sparkAlwaysFalse =
+      new SparkPredicate(
+        "ALWAYS_FALSE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val intEq1 = v2Filter("int_col = 1")
+    val sparkAnd = new And(sparkAlwaysFalse, intEq1)
+    val paimonPredicate = converter.convert(sparkAnd).get
+
+    val filesScanned = scanFilesWithPredicate(paimonPredicate)
+    assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned 
files")
+  }
+
+  test("V2Filter: performance - NOT(AlwaysTrue) achieves zero I/O") {
+    // NOT(ALWAYS_TRUE) = ALWAYS_FALSE -> 0 files scanned
+    val sparkAlwaysTrue =
+      new SparkPredicate(
+        "ALWAYS_TRUE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val sparkNot = new Not(sparkAlwaysTrue)
+    val paimonPredicate = converter.convert(sparkNot).get
+
+    val filesScanned = scanFilesWithPredicate(paimonPredicate)
+    assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned 
files")
+  }
+
+  test("V2Filter: performance - OR(AlwaysTrue, int_col = 1) scans all files") {
+    // OR with AlwaysTrue means everything matches -> all files scanned
+    val sparkAlwaysTrue =
+      new SparkPredicate(
+        "ALWAYS_TRUE",
+        Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+    val intEq1 = v2Filter("int_col = 1")
+    val sparkOr = new Or(sparkAlwaysTrue, intEq1)
+    val paimonPredicate = converter.convert(sparkOr).get
+
+    val filesScanned = scanFilesWithPredicate(paimonPredicate)
+    // All 4 files should be scanned because AlwaysTrue in OR matches 
everything
+    assert(filesScanned == 4, s"Expected 4 files but scanned $filesScanned 
files")
+  }
+
+  private def v2Filter(str: String, tableName: String = "test_tbl"): 
SparkPredicate = {
     val condition = sql(s"SELECT * FROM $tableName WHERE 
$str").queryExecution.optimizedPlan
       .collectFirst { case f: Filter => f }
       .get
@@ -369,4 +527,19 @@ abstract class SparkV2FilterConverterTestBase extends 
PaimonSparkTestBase {
       .map(_.asInstanceOf[DataSplit].dataFiles().size())
       .sum
   }
+
+  /** Use Paimon ReadBuilder to count how many data files would be scanned for 
a given predicate. */
+  private def scanFilesWithPredicate(
+      predicate: org.apache.paimon.predicate.Predicate,
+      tableName: String = "test_tbl"): Int = {
+    loadTable(tableName)
+      .newReadBuilder()
+      .withFilter(predicate)
+      .newScan()
+      .plan()
+      .splits()
+      .asScala
+      .map(_.asInstanceOf[DataSplit].dataFiles().size())
+      .sum
+  }
 }

Reply via email to