This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d30da0116b [spark] Support AlwaysTrue and AlwaysFalse predicates in
SparkV2FilterConverter (#7224)
d30da0116b is described below
commit d30da0116bbc1a2263973006f8cc06247db845d2
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Feb 26 12:28:25 2026 +0800
[spark] Support AlwaysTrue and AlwaysFalse predicates in
SparkV2FilterConverter (#7224)
`DELETE FROM t1 WHERE pt IN (SELECT id FROM t2 WHERE n > 10) OR pt = 3`
The subquery (n > 10) returns empty, evalSubquery replaces it with
Literal(false). After translateFilterV2, this becomes: V2Or(ALWAYS_FALSE,
V2EqualTo(pt, 3)). Without supporting ALWAYS_FALSE in SparkV2FilterConverter,
the OR conversion would fail, falling back to the expensive row-level delete
instead of partition dropping.
---
.../org/apache/paimon/predicate/FalseFunction.java | 71 ++++++++
.../org/apache/paimon/predicate/LeafFunction.java | 2 +
.../org/apache/paimon/predicate/LeafPredicate.java | 6 +
.../predicate/PartitionPredicateVisitor.java | 4 +-
.../org/apache/paimon/predicate/Transform.java | 3 +-
.../org/apache/paimon/predicate/TrueFunction.java | 71 ++++++++
.../org/apache/paimon/predicate/TrueTransform.java | 84 ++++++++++
.../apache/paimon/predicate/LeafPredicateTest.java | 96 +++++++++++
.../paimon/predicate/PredicateJsonSerdeTest.java | 18 +++
.../paimon/spark/SparkV2FilterConverter.scala | 17 +-
.../spark/sql/SparkV2FilterConverterTestBase.scala | 179 ++++++++++++++++++++-
11 files changed, 544 insertions(+), 7 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java
new file mode 100644
index 0000000000..36b9300961
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.types.DataType;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link LeafFunction} that always returns {@code false}. Used for
AlwaysFalse predicates. */
+public class FalseFunction extends LeafFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String NAME = "FALSE";
+
+ public static final FalseFunction INSTANCE = new FalseFunction();
+
+ @JsonCreator
+ private FalseFunction() {}
+
+ @Override
+ public boolean test(DataType type, Object field, List<Object> literals) {
+ return false;
+ }
+
+ @Override
+ public boolean test(
+ DataType type,
+ long rowCount,
+ Object min,
+ Object max,
+ Long nullCount,
+ List<Object> literals) {
+ return false;
+ }
+
+ @Override
+ public Optional<LeafFunction> negate() {
+ return Optional.of(TrueFunction.INSTANCE);
+ }
+
+ @Override
+ public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef,
List<Object> literals) {
+ throw new UnsupportedOperationException(
+ "FalseFunction does not support field-based visitation.");
+ }
+
+ @Override
+ public String toJson() {
+ return NAME;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
index 5d8edc1d67..0f45e8e920 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
@@ -68,6 +68,8 @@ public abstract class LeafFunction implements Serializable {
registry.put(NotIn.NAME, NotIn.INSTANCE);
registry.put(Between.NAME, Between.INSTANCE);
registry.put(NotBetween.NAME, NotBetween.INSTANCE);
+ registry.put(TrueFunction.NAME, TrueFunction.INSTANCE);
+ registry.put(FalseFunction.NAME, FalseFunction.INSTANCE);
return Collections.unmodifiableMap(registry);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 5e5b386311..e10b5deb5e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -143,6 +143,9 @@ public class LeafPredicate implements Predicate {
long rowCount, InternalRow minValues, InternalRow maxValues,
InternalArray nullCounts) {
Optional<FieldRef> fieldRefOptional = fieldRefOptional();
if (!fieldRefOptional.isPresent()) {
+ if (transform instanceof TrueTransform) {
+ return function.test(transform.outputType(), 0, null, null,
null, literals);
+ }
return true;
}
FieldRef fieldRef = fieldRefOptional.get();
@@ -165,6 +168,9 @@ public class LeafPredicate implements Predicate {
@Override
public Optional<Predicate> negate() {
+ if (transform instanceof TrueTransform) {
+ return function.negate().map(neg -> new LeafPredicate(transform,
neg, literals));
+ }
Optional<FieldRef> fieldRefOptional = fieldRefOptional();
if (!fieldRefOptional.isPresent()) {
return Optional.empty();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
index 4db27bd1a1..94f92102e0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
@@ -32,14 +32,16 @@ public class PartitionPredicateVisitor implements
PredicateVisitor<Boolean> {
@Override
public Boolean visit(LeafPredicate predicate) {
Transform transform = predicate.transform();
+ boolean hasFieldRef = false;
for (Object input : transform.inputs()) {
if (input instanceof FieldRef) {
+ hasFieldRef = true;
if (!partitionKeys.contains(((FieldRef) input).name())) {
return false;
}
}
}
- return true;
+ return hasFieldRef;
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
index 199e1df170..6d7b9ba699 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
@@ -38,7 +38,8 @@ import java.util.List;
@JsonSubTypes.Type(value = ConcatTransform.class, name =
ConcatTransform.NAME),
@JsonSubTypes.Type(value = ConcatWsTransform.class, name =
ConcatWsTransform.NAME),
@JsonSubTypes.Type(value = UpperTransform.class, name =
UpperTransform.NAME),
- @JsonSubTypes.Type(value = LowerTransform.class, name =
LowerTransform.NAME)
+ @JsonSubTypes.Type(value = LowerTransform.class, name =
LowerTransform.NAME),
+ @JsonSubTypes.Type(value = TrueTransform.class, name = TrueTransform.NAME)
})
public interface Transform extends Serializable {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java
new file mode 100644
index 0000000000..6ca9d057f0
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.types.DataType;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link LeafFunction} that always returns {@code true}. Used for
AlwaysTrue predicates. */
+public class TrueFunction extends LeafFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String NAME = "TRUE";
+
+ public static final TrueFunction INSTANCE = new TrueFunction();
+
+ @JsonCreator
+ private TrueFunction() {}
+
+ @Override
+ public boolean test(DataType type, Object field, List<Object> literals) {
+ return true;
+ }
+
+ @Override
+ public boolean test(
+ DataType type,
+ long rowCount,
+ Object min,
+ Object max,
+ Long nullCount,
+ List<Object> literals) {
+ return true;
+ }
+
+ @Override
+ public Optional<LeafFunction> negate() {
+ return Optional.of(FalseFunction.INSTANCE);
+ }
+
+ @Override
+ public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef,
List<Object> literals) {
+ throw new UnsupportedOperationException(
+ "TrueFunction does not support field-based visitation.");
+ }
+
+ @Override
+ public String toJson() {
+ return NAME;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java
new file mode 100644
index 0000000000..2017d2e8d2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.Collections;
+import java.util.List;
+
+/** A {@link Transform} that always returns {@code true}. Used for constant
predicates. */
+public class TrueTransform implements Transform {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String NAME = "TRUE";
+
+ public static final TrueTransform INSTANCE = new TrueTransform();
+
+ @JsonCreator
+ private TrueTransform() {}
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public List<Object> inputs() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public DataType outputType() {
+ return DataTypes.BOOLEAN();
+ }
+
+ @Override
+ public Object transform(InternalRow row) {
+ return true;
+ }
+
+ @Override
+ public Transform copyWithNewInputs(List<Object> inputs) {
+ return INSTANCE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
+
+ @Override
+ public int hashCode() {
+ return NAME.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return NAME;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
b/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
index 1368611554..0af7cf3806 100644
---
a/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.predicate;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.InstantiationUtil;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -78,4 +80,98 @@ class LeafPredicateTest {
literals.add(BinaryString.fromString("ha-he"));
return LeafPredicate.of(transform, Equal.INSTANCE, literals);
}
+
+ @Test
+ public void testAlwaysTrueRow() {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, TrueFunction.INSTANCE,
Collections.emptyList());
+ assertThat(predicate.test(GenericRow.of(1))).isTrue();
+ assertThat(predicate.test(GenericRow.of((Object) null))).isTrue();
+ }
+
+ @Test
+ public void testAlwaysFalseRow() {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, FalseFunction.INSTANCE,
Collections.emptyList());
+ assertThat(predicate.test(GenericRow.of(1))).isFalse();
+ assertThat(predicate.test(GenericRow.of((Object) null))).isFalse();
+ }
+
+ @Test
+ public void testAlwaysTrueMinMax() {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, TrueFunction.INSTANCE,
Collections.emptyList());
+ assertThat(
+ predicate.test(
+ 10,
+ GenericRow.of(1),
+ GenericRow.of(10),
+ new GenericArray(new long[] {0})))
+ .isTrue();
+ assertThat(predicate.test(1, null, null, null)).isTrue();
+ }
+
+ @Test
+ public void testAlwaysFalseMinMax() {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, FalseFunction.INSTANCE,
Collections.emptyList());
+ assertThat(
+ predicate.test(
+ 10,
+ GenericRow.of(1),
+ GenericRow.of(10),
+ new GenericArray(new long[] {0})))
+ .isFalse();
+ assertThat(predicate.test(1, null, null, null)).isFalse();
+ }
+
+ @Test
+ public void testAlwaysTrueNegate() {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, TrueFunction.INSTANCE,
Collections.emptyList());
+ Predicate negated = predicate.negate().get();
+ assertThat(negated).isInstanceOf(LeafPredicate.class);
+ LeafPredicate negatedLeaf = (LeafPredicate) negated;
+ assertThat(negatedLeaf.function()).isEqualTo(FalseFunction.INSTANCE);
+ assertThat(negatedLeaf.transform()).isInstanceOf(TrueTransform.class);
+ assertThat(negatedLeaf.test(GenericRow.of(1))).isFalse();
+ }
+
+ @Test
+ public void testAlwaysFalseNegate() {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, FalseFunction.INSTANCE,
Collections.emptyList());
+ Predicate negated = predicate.negate().get();
+ assertThat(negated).isInstanceOf(LeafPredicate.class);
+ LeafPredicate negatedLeaf = (LeafPredicate) negated;
+ assertThat(negatedLeaf.function()).isEqualTo(TrueFunction.INSTANCE);
+ assertThat(negatedLeaf.transform()).isInstanceOf(TrueTransform.class);
+ assertThat(negatedLeaf.test(GenericRow.of(1))).isTrue();
+ }
+
+ @Test
+ public void testAlwaysTrueSerialization() throws IOException,
ClassNotFoundException {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, TrueFunction.INSTANCE,
Collections.emptyList());
+ LeafPredicate clone = InstantiationUtil.clone(predicate);
+ assertThat(clone).isEqualTo(predicate);
+ assertThat(clone.hashCode()).isEqualTo(predicate.hashCode());
+ }
+
+ @Test
+ public void testAlwaysFalseSerialization() throws IOException,
ClassNotFoundException {
+ LeafPredicate predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE, FalseFunction.INSTANCE,
Collections.emptyList());
+ LeafPredicate clone = InstantiationUtil.clone(predicate);
+ assertThat(clone).isEqualTo(predicate);
+ assertThat(clone.hashCode()).isEqualTo(predicate.hashCode());
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
index 475e889c34..3093b83eff 100644
---
a/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java
@@ -156,6 +156,24 @@ class PredicateJsonSerdeTest {
.expectJson(
"{\"kind\":\"LEAF\",\"transform\":{\"name\":\"FIELD_REF\",\"fieldRef\":{\"index\":2,\"name\":\"f2\",\"type\":\"STRING\"}},\"function\":\"LIKE\",\"literals\":[\"%a%b%\"]}"),
+ // LeafPredicate - TrueTransform + TrueFunction (AlwaysTrue)
+ TestSpec.forPredicate(
+ LeafPredicate.of(
+ TrueTransform.INSTANCE,
+ TrueFunction.INSTANCE,
+ Collections.emptyList()))
+ .expectJson(
+
"{\"kind\":\"LEAF\",\"transform\":{\"name\":\"TRUE\"},\"function\":\"TRUE\",\"literals\":[]}"),
+
+ // LeafPredicate - TrueTransform + FalseFunction (AlwaysFalse)
+ TestSpec.forPredicate(
+ LeafPredicate.of(
+ TrueTransform.INSTANCE,
+ FalseFunction.INSTANCE,
+ Collections.emptyList()))
+ .expectJson(
+
"{\"kind\":\"LEAF\",\"transform\":{\"name\":\"TRUE\"},\"function\":\"FALSE\",\"literals\":[]}"),
+
// LeafPredicate - In with many values including nulls
TestSpec.forPredicate(
builder.in(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index 36e188bde7..970d619792 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder, Transform}
+import org.apache.paimon.predicate.{FalseFunction, LeafPredicate, Predicate,
PredicateBuilder, Transform, TrueFunction, TrueTransform}
import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral,
toPaimonTransform}
import org.apache.paimon.types.RowType
@@ -164,7 +164,18 @@ case class SparkV2FilterConverter(rowType: RowType)
extends Logging {
throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
- // TODO: AlwaysTrue, AlwaysFalse
+ case ALWAYS_TRUE =>
+ LeafPredicate.of(
+ TrueTransform.INSTANCE,
+ TrueFunction.INSTANCE,
+ java.util.Collections.emptyList())
+
+ case ALWAYS_FALSE =>
+ LeafPredicate.of(
+ TrueTransform.INSTANCE,
+ FalseFunction.INSTANCE,
+ java.util.Collections.emptyList())
+
case _ => throw new UnsupportedOperationException(s"Convert
$sparkPredicate is unsupported.")
}
}
@@ -228,5 +239,7 @@ object SparkV2FilterConverter extends Logging {
private val STRING_START_WITH = "STARTS_WITH"
private val STRING_END_WITH = "ENDS_WITH"
private val STRING_CONTAINS = "CONTAINS"
+ private val ALWAYS_TRUE = "ALWAYS_TRUE"
+ private val ALWAYS_FALSE = "ALWAYS_FALSE"
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
index 2d90777fd7..430f0785eb 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
-import org.apache.paimon.predicate.PredicateBuilder
+import org.apache.paimon.predicate.{FalseFunction, LeafPredicate,
PredicateBuilder, TrueFunction, TrueTransform}
import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter}
import
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
import org.apache.paimon.table.source.DataSplit
@@ -29,7 +29,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.PaimonUtils.translateFilterV2
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or,
Predicate => SparkPredicate}
import java.time.{LocalDate, LocalDateTime}
@@ -355,7 +355,165 @@ abstract class SparkV2FilterConverterTestBase extends
PaimonSparkTestBase {
assert(scanFilesCount(filter) == 4)
}
- private def v2Filter(str: String, tableName: String = "test_tbl"): Predicate
= {
+ private def paimonAlwaysTrue: org.apache.paimon.predicate.Predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE,
+ TrueFunction.INSTANCE,
+ java.util.Collections.emptyList())
+
+ private def paimonAlwaysFalse: org.apache.paimon.predicate.Predicate =
+ LeafPredicate.of(
+ TrueTransform.INSTANCE,
+ FalseFunction.INSTANCE,
+ java.util.Collections.emptyList())
+
+ test("V2Filter: AlwaysTrue") {
+ val sparkAlwaysTrue =
+ new SparkPredicate(
+ "ALWAYS_TRUE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val actual = converter.convert(sparkAlwaysTrue).get
+ assert(actual.equals(paimonAlwaysTrue))
+ }
+
+ test("V2Filter: AlwaysFalse") {
+ val sparkAlwaysFalse =
+ new SparkPredicate(
+ "ALWAYS_FALSE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val actual = converter.convert(sparkAlwaysFalse).get
+ assert(actual.equals(paimonAlwaysFalse))
+ }
+
+ test("V2Filter: NOT AlwaysTrue") {
+ val sparkAlwaysTrue =
+ new SparkPredicate(
+ "ALWAYS_TRUE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val sparkNotAlwaysTrue = new Not(sparkAlwaysTrue)
+ val actual = converter.convert(sparkNotAlwaysTrue).get
+ assert(actual.equals(paimonAlwaysFalse))
+ }
+
+ test("V2Filter: NOT AlwaysFalse") {
+ val sparkAlwaysFalse =
+ new SparkPredicate(
+ "ALWAYS_FALSE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val sparkNotAlwaysFalse = new Not(sparkAlwaysFalse)
+ val actual = converter.convert(sparkNotAlwaysFalse).get
+ assert(actual.equals(paimonAlwaysTrue))
+ }
+
+ test("V2Filter: AND with AlwaysTrue") {
+ val sparkAlwaysTrue =
+ new SparkPredicate(
+ "ALWAYS_TRUE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val intGt2 = v2Filter("int_col > 2")
+ val sparkAnd = new And(sparkAlwaysTrue, intGt2)
+ val actual = converter.convert(sparkAnd).get
+ assert(actual.equals(PredicateBuilder.and(paimonAlwaysTrue,
builder.greaterThan(3, 2))))
+ }
+
+ test("V2Filter: OR with AlwaysFalse") {
+ val sparkAlwaysFalse =
+ new SparkPredicate(
+ "ALWAYS_FALSE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val intGt2 = v2Filter("int_col > 2")
+ val sparkOr = new Or(sparkAlwaysFalse, intGt2)
+ val actual = converter.convert(sparkOr).get
+ assert(actual.equals(PredicateBuilder.or(paimonAlwaysFalse,
builder.greaterThan(3, 2))))
+ }
+
+ test("V2Filter: performance - OR(AlwaysFalse, int_col = 1) enables file
skipping") {
+ // Before: OR(ALWAYS_FALSE, int_col = 1) conversion fails entirely -> no
pushdown -> 4 files
+ // After: OR(ALWAYS_FALSE, int_col = 1) converts correctly -> pushdown ->
1 file
+ val sparkAlwaysFalse =
+ new SparkPredicate(
+ "ALWAYS_FALSE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val intEq1 = v2Filter("int_col = 1")
+ val sparkOr = new Or(sparkAlwaysFalse, intEq1)
+ val paimonPredicate = converter.convert(sparkOr).get
+
+ val filesScanned = scanFilesWithPredicate(paimonPredicate)
+ // Only 1 file should be scanned (the one containing int_col = 1)
+ assert(filesScanned == 1, s"Expected 1 file but scanned $filesScanned
files")
+ }
+
+ test("V2Filter: performance - AND(AlwaysTrue, int_col > 2) enables file
skipping") {
+ // Before: AND(ALWAYS_TRUE, int_col > 2) conversion fails entirely -> no
pushdown -> 4 files
+ // After: AND(ALWAYS_TRUE, int_col > 2) converts correctly -> pushdown ->
1 file
+ val sparkAlwaysTrue =
+ new SparkPredicate(
+ "ALWAYS_TRUE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val intGt2 = v2Filter("int_col > 2")
+ val sparkAnd = new And(sparkAlwaysTrue, intGt2)
+ val paimonPredicate = converter.convert(sparkAnd).get
+
+ val filesScanned = scanFilesWithPredicate(paimonPredicate)
+ // Only 1 file should be scanned (the one containing int_col = 3)
+ assert(filesScanned == 1, s"Expected 1 file but scanned $filesScanned
files")
+ }
+
+ test("V2Filter: performance - pure AlwaysFalse achieves zero I/O") {
+ // AlwaysFalse alone should skip all data files -> 0 files scanned
+ val sparkAlwaysFalse =
+ new SparkPredicate(
+ "ALWAYS_FALSE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val paimonPredicate = converter.convert(sparkAlwaysFalse).get
+
+ val filesScanned = scanFilesWithPredicate(paimonPredicate)
+ assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned
files")
+ }
+
+ test("V2Filter: performance - AND(AlwaysFalse, int_col = 1) achieves zero
I/O") {
+ // AND with AlwaysFalse should result in zero files no matter what the
other predicate is
+ val sparkAlwaysFalse =
+ new SparkPredicate(
+ "ALWAYS_FALSE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val intEq1 = v2Filter("int_col = 1")
+ val sparkAnd = new And(sparkAlwaysFalse, intEq1)
+ val paimonPredicate = converter.convert(sparkAnd).get
+
+ val filesScanned = scanFilesWithPredicate(paimonPredicate)
+ assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned
files")
+ }
+
+ test("V2Filter: performance - NOT(AlwaysTrue) achieves zero I/O") {
+ // NOT(ALWAYS_TRUE) = ALWAYS_FALSE -> 0 files scanned
+ val sparkAlwaysTrue =
+ new SparkPredicate(
+ "ALWAYS_TRUE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val sparkNot = new Not(sparkAlwaysTrue)
+ val paimonPredicate = converter.convert(sparkNot).get
+
+ val filesScanned = scanFilesWithPredicate(paimonPredicate)
+ assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned
files")
+ }
+
+ test("V2Filter: performance - OR(AlwaysTrue, int_col = 1) scans all files") {
+ // OR with AlwaysTrue means everything matches -> all files scanned
+ val sparkAlwaysTrue =
+ new SparkPredicate(
+ "ALWAYS_TRUE",
+ Array.empty[org.apache.spark.sql.connector.expressions.Expression])
+ val intEq1 = v2Filter("int_col = 1")
+ val sparkOr = new Or(sparkAlwaysTrue, intEq1)
+ val paimonPredicate = converter.convert(sparkOr).get
+
+ val filesScanned = scanFilesWithPredicate(paimonPredicate)
+ // All 4 files should be scanned because AlwaysTrue in OR matches
everything
+ assert(filesScanned == 4, s"Expected 4 files but scanned $filesScanned
files")
+ }
+
+ private def v2Filter(str: String, tableName: String = "test_tbl"):
SparkPredicate = {
val condition = sql(s"SELECT * FROM $tableName WHERE
$str").queryExecution.optimizedPlan
.collectFirst { case f: Filter => f }
.get
@@ -369,4 +527,19 @@ abstract class SparkV2FilterConverterTestBase extends
PaimonSparkTestBase {
.map(_.asInstanceOf[DataSplit].dataFiles().size())
.sum
}
+
+ /** Use Paimon ReadBuilder to count how many data files would be scanned for
a given predicate. */
+ private def scanFilesWithPredicate(
+ predicate: org.apache.paimon.predicate.Predicate,
+ tableName: String = "test_tbl"): Int = {
+ loadTable(tableName)
+ .newReadBuilder()
+ .withFilter(predicate)
+ .newScan()
+ .plan()
+ .splits()
+ .asScala
+ .map(_.asInstanceOf[DataSplit].dataFiles().size())
+ .sum
+ }
}