This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d442b88ff0 [core][spark] Support push down transform predicate (#6506)
d442b88ff0 is described below

commit d442b88ff02e1b04132799d1378e34c6fab56611
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Nov 3 17:59:14 2025 +0800

    [core][spark] Support push down transform predicate (#6506)
---
 .../apache/paimon/predicate/ConcatTransform.java   |   5 +
 .../apache/paimon/predicate/ConcatWsTransform.java |   5 +
 .../apache/paimon/predicate/FieldTransform.java    |  77 ++++++++
 .../org/apache/paimon/predicate/LeafPredicate.java |  77 +++-----
 .../apache/paimon/predicate/PredicateBuilder.java  | 115 +++++++++--
 .../org/apache/paimon/predicate/Transform.java     |   2 +
 .../paimon/predicate/TransformPredicate.java       |  32 +++-
 .../paimon/predicate/TransformPredicateTest.java   |   2 +-
 .../apache/paimon/spark/PaimonBasePushDown.scala   |   4 +-
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |   2 -
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../paimon/spark/SparkV2FilterConverter.scala      | 213 ++++++++-------------
 .../analysis/expressions/ExpressionHelper.scala    |   1 -
 .../spark/util/SparkExpressionConverter.scala      | 108 +++++++++++
 .../sql/connector/catalog/PaimonCatalogUtils.scala |   3 +-
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  42 ++++
 16 files changed, 481 insertions(+), 210 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java
index 4c99df2dfc..a220beca9f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java
@@ -35,4 +35,9 @@ public class ConcatTransform extends StringTransform {
     public BinaryString transform(List<BinaryString> inputs) {
         return BinaryString.concat(inputs);
     }
+
+    @Override
+    public Transform withNewInputs(List<Object> inputs) {
+        return new ConcatTransform(inputs);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
index b121799cd8..5c7df4f47d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
@@ -39,4 +39,9 @@ public class ConcatWsTransform extends StringTransform {
         BinaryString separator = inputs.get(0);
         return BinaryString.concatWs(separator, inputs.subList(1, 
inputs.size()));
     }
+
+    @Override
+    public Transform withNewInputs(List<Object> inputs) {
+        return new ConcatWsTransform(inputs);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
new file mode 100644
index 0000000000..4b842c39ba
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.InternalRowUtils.get;
+
+/** Transform that extracts a field from a row. */
+public class FieldTransform implements Transform {
+
+    private final FieldRef fieldRef;
+
+    public FieldTransform(FieldRef fieldRef) {
+        this.fieldRef = fieldRef;
+    }
+
+    public FieldRef fieldRef() {
+        return fieldRef;
+    }
+
+    @Override
+    public List<Object> inputs() {
+        return Collections.singletonList(fieldRef);
+    }
+
+    @Override
+    public DataType outputType() {
+        return fieldRef.type();
+    }
+
+    @Override
+    public Object transform(InternalRow row) {
+        return get(row, fieldRef.index(), fieldRef.type());
+    }
+
+    @Override
+    public Transform withNewInputs(List<Object> inputs) {
+        assert inputs.size() == 1;
+        return new FieldTransform((FieldRef) inputs.get(0));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FieldTransform that = (FieldTransform) o;
+        return Objects.equals(fieldRef, that.fieldRef);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(fieldRef);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
index 5267f7069f..0ec9c03a3f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java
@@ -30,6 +30,7 @@ import org.apache.paimon.types.DataType;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -37,28 +38,22 @@ import java.util.Optional;
 import static org.apache.paimon.utils.InternalRowUtils.get;
 
 /** Leaf node of a {@link Predicate} tree. Compares a field in the row with 
literals. */
-public class LeafPredicate implements Predicate {
+public class LeafPredicate extends TransformPredicate {
 
     private static final long serialVersionUID = 1L;
 
-    private final LeafFunction function;
-    private final DataType type;
-    private final int fieldIndex;
-    private final String fieldName;
-
-    private transient List<Object> literals;
-
     public LeafPredicate(
             LeafFunction function,
             DataType type,
             int fieldIndex,
             String fieldName,
             List<Object> literals) {
-        this.function = function;
-        this.type = type;
-        this.fieldIndex = fieldIndex;
-        this.fieldName = fieldName;
-        this.literals = literals;
+        this(new FieldTransform(new FieldRef(fieldIndex, fieldName, type)), 
function, literals);
+    }
+
+    public LeafPredicate(
+            FieldTransform fieldTransform, LeafFunction function, List<Object> 
literals) {
+        super(fieldTransform, function, literals);
     }
 
     public LeafFunction function() {
@@ -66,19 +61,23 @@ public class LeafPredicate implements Predicate {
     }
 
     public DataType type() {
-        return type;
+        return fieldRef().type();
     }
 
     public int index() {
-        return fieldIndex;
+        return fieldRef().index();
     }
 
     public String fieldName() {
-        return fieldName;
+        return fieldRef().name();
+    }
+
+    public List<String> fieldNames() {
+        return Collections.singletonList(fieldRef().name());
     }
 
     public FieldRef fieldRef() {
-        return new FieldRef(fieldIndex, fieldName, type);
+        return ((FieldTransform) transform).fieldRef();
     }
 
     public List<Object> literals() {
@@ -86,20 +85,15 @@ public class LeafPredicate implements Predicate {
     }
 
     public LeafPredicate copyWithNewIndex(int fieldIndex) {
-        return new LeafPredicate(function, type, fieldIndex, fieldName, 
literals);
-    }
-
-    @Override
-    public boolean test(InternalRow row) {
-        return function.test(type, get(row, fieldIndex, type), literals);
+        return new LeafPredicate(function, type(), fieldIndex, fieldName(), 
literals);
     }
 
     @Override
     public boolean test(
             long rowCount, InternalRow minValues, InternalRow maxValues, 
InternalArray nullCounts) {
-        Object min = get(minValues, fieldIndex, type);
-        Object max = get(maxValues, fieldIndex, type);
-        Long nullCount = nullCounts.isNullAt(fieldIndex) ? null : 
nullCounts.getLong(fieldIndex);
+        Object min = get(minValues, index(), type());
+        Object max = get(maxValues, index(), type());
+        Long nullCount = nullCounts.isNullAt(index()) ? null : 
nullCounts.getLong(index());
         if (nullCount == null || rowCount != nullCount) {
             // not all null
             // min or max is null
@@ -108,13 +102,13 @@ public class LeafPredicate implements Predicate {
                 return true;
             }
         }
-        return function.test(type, rowCount, min, max, nullCount, literals);
+        return function.test(type(), rowCount, min, max, nullCount, literals);
     }
 
     @Override
     public Optional<Predicate> negate() {
         return function.negate()
-                .map(negate -> new LeafPredicate(negate, type, fieldIndex, 
fieldName, literals));
+                .map(negate -> new LeafPredicate(negate, type(), index(), 
fieldName(), literals));
     }
 
     @Override
@@ -122,27 +116,6 @@ public class LeafPredicate implements Predicate {
         return visitor.visit(this);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        LeafPredicate that = (LeafPredicate) o;
-        return fieldIndex == that.fieldIndex
-                && Objects.equals(fieldName, that.fieldName)
-                && Objects.equals(function, that.function)
-                && Objects.equals(type, that.type)
-                && Objects.equals(literals, that.literals);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(function, type, fieldIndex, fieldName, literals);
-    }
-
     @Override
     public String toString() {
         String literalsStr;
@@ -154,13 +127,13 @@ public class LeafPredicate implements Predicate {
             literalsStr = literals.toString();
         }
         return literalsStr.isEmpty()
-                ? function + "(" + fieldName + ")"
-                : function + "(" + fieldName + ", " + literalsStr + ")";
+                ? function + "(" + fieldName() + ")"
+                : function + "(" + fieldName() + ", " + literalsStr + ")";
     }
 
     private ListSerializer<Object> objectsSerializer() {
         return new ListSerializer<>(
-                
NullableSerializer.wrapIfNullIsNotSupported(InternalSerializers.create(type)));
+                
NullableSerializer.wrapIfNullIsNotSupported(InternalSerializers.create(type())));
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 326b82aecb..26de3686bb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -77,57 +77,109 @@ public class PredicateBuilder {
         return leaf(Equal.INSTANCE, idx, literal);
     }
 
+    public Predicate equal(Transform transform, Object literal) {
+        return leaf(Equal.INSTANCE, transform, literal);
+    }
+
     public Predicate notEqual(int idx, Object literal) {
         return leaf(NotEqual.INSTANCE, idx, literal);
     }
 
+    public Predicate notEqual(Transform transform, Object literal) {
+        return leaf(NotEqual.INSTANCE, transform, literal);
+    }
+
     public Predicate lessThan(int idx, Object literal) {
         return leaf(LessThan.INSTANCE, idx, literal);
     }
 
+    public Predicate lessThan(Transform transform, Object literal) {
+        return leaf(LessThan.INSTANCE, transform, literal);
+    }
+
     public Predicate lessOrEqual(int idx, Object literal) {
         return leaf(LessOrEqual.INSTANCE, idx, literal);
     }
 
+    public Predicate lessOrEqual(Transform transform, Object literal) {
+        return leaf(LessOrEqual.INSTANCE, transform, literal);
+    }
+
     public Predicate greaterThan(int idx, Object literal) {
         return leaf(GreaterThan.INSTANCE, idx, literal);
     }
 
+    public Predicate greaterThan(Transform transform, Object literal) {
+        return leaf(GreaterThan.INSTANCE, transform, literal);
+    }
+
     public Predicate greaterOrEqual(int idx, Object literal) {
         return leaf(GreaterOrEqual.INSTANCE, idx, literal);
     }
 
+    public Predicate greaterOrEqual(Transform transform, Object literal) {
+        return leaf(GreaterOrEqual.INSTANCE, transform, literal);
+    }
+
     public Predicate isNull(int idx) {
         return leaf(IsNull.INSTANCE, idx);
     }
 
+    public Predicate isNull(Transform transform) {
+        return leaf(IsNull.INSTANCE, transform);
+    }
+
     public Predicate isNotNull(int idx) {
         return leaf(IsNotNull.INSTANCE, idx);
     }
 
+    public Predicate isNotNull(Transform transform) {
+        return leaf(IsNotNull.INSTANCE, transform);
+    }
+
     public Predicate startsWith(int idx, Object patternLiteral) {
         return leaf(StartsWith.INSTANCE, idx, patternLiteral);
     }
 
+    public Predicate startsWith(Transform transform, Object patternLiteral) {
+        return leaf(StartsWith.INSTANCE, transform, patternLiteral);
+    }
+
     public Predicate endsWith(int idx, Object patternLiteral) {
         return leaf(EndsWith.INSTANCE, idx, patternLiteral);
     }
 
+    public Predicate endsWith(Transform transform, Object patternLiteral) {
+        return leaf(EndsWith.INSTANCE, transform, patternLiteral);
+    }
+
     public Predicate contains(int idx, Object patternLiteral) {
         return leaf(Contains.INSTANCE, idx, patternLiteral);
     }
 
-    public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, 
Object literal) {
+    public Predicate contains(Transform transform, Object patternLiteral) {
+        return leaf(Contains.INSTANCE, transform, patternLiteral);
+    }
+
+    private Predicate leaf(NullFalseLeafBinaryFunction function, int idx, 
Object literal) {
         DataField field = rowType.getFields().get(idx);
         return new LeafPredicate(function, field.type(), idx, field.name(), 
singletonList(literal));
     }
 
-    public Predicate leaf(LeafUnaryFunction function, int idx) {
+    private Predicate leaf(LeafFunction function, Transform transform, Object 
literal) {
+        return TransformPredicate.of(transform, function, 
singletonList(literal));
+    }
+
+    private Predicate leaf(LeafUnaryFunction function, int idx) {
         DataField field = rowType.getFields().get(idx);
         return new LeafPredicate(
                 function, field.type(), idx, field.name(), 
Collections.emptyList());
     }
 
+    private Predicate leaf(LeafFunction function, Transform transform) {
+        return TransformPredicate.of(transform, function, 
Collections.emptyList());
+    }
+
     public Predicate in(int idx, List<Object> literals) {
         // In the IN predicate, 20 literals are critical for performance.
         // If there are more than 20 literals, the performance will decrease.
@@ -143,6 +195,20 @@ public class PredicateBuilder {
         return or(equals);
     }
 
+    public Predicate in(Transform transform, List<Object> literals) {
+        // In the IN predicate, 20 literals are critical for performance.
+        // If there are more than 20 literals, the performance will decrease.
+        if (literals.size() > 20) {
+            return TransformPredicate.of(transform, In.INSTANCE, literals);
+        }
+
+        List<Predicate> equals = new ArrayList<>(literals.size());
+        for (Object literal : literals) {
+            equals.add(equal(transform, literal));
+        }
+        return or(equals);
+    }
+
     public Predicate notIn(int idx, List<Object> literals) {
         return in(idx, literals).negate().get();
     }
@@ -155,6 +221,15 @@ public class PredicateBuilder {
                         lessOrEqual(idx, includedUpperBound)));
     }
 
+    public Predicate between(
+            Transform transform, Object includedLowerBound, Object 
includedUpperBound) {
+        return new CompoundPredicate(
+                And.INSTANCE,
+                Arrays.asList(
+                        greaterOrEqual(transform, includedLowerBound),
+                        lessOrEqual(transform, includedUpperBound)));
+    }
+
     public static Predicate and(Predicate... predicates) {
         return and(Arrays.asList(predicates));
     }
@@ -366,20 +441,26 @@ public class PredicateBuilder {
                 }
             }
             return Optional.of(new 
CompoundPredicate(compoundPredicate.function(), children));
-        } else {
-            LeafPredicate leafPredicate = (LeafPredicate) predicate;
-            int mapped = fieldIdxMapping[leafPredicate.index()];
-            if (mapped >= 0) {
-                return Optional.of(
-                        new LeafPredicate(
-                                leafPredicate.function(),
-                                leafPredicate.type(),
-                                mapped,
-                                leafPredicate.fieldName(),
-                                leafPredicate.literals()));
-            } else {
-                return Optional.empty();
+        } else if (predicate instanceof TransformPredicate) {
+            TransformPredicate transformPredicate = (TransformPredicate) 
predicate;
+            List<Object> inputs = transformPredicate.transform.inputs();
+            List<Object> newInputs = new ArrayList<>(inputs.size());
+            for (Object input : inputs) {
+                if (input instanceof FieldRef) {
+                    FieldRef fieldRef = (FieldRef) input;
+                    int mappedIndex = fieldIdxMapping[fieldRef.index()];
+                    if (mappedIndex >= 0) {
+                        newInputs.add(new FieldRef(mappedIndex, 
fieldRef.name(), fieldRef.type()));
+                    } else {
+                        return Optional.empty();
+                    }
+                } else {
+                    newInputs.add(input);
+                }
             }
+            return Optional.of(transformPredicate.withNewInputs(newInputs));
+        } else {
+            return Optional.empty();
         }
     }
 
@@ -392,8 +473,8 @@ public class PredicateBuilder {
             }
             return false;
         } else {
-            LeafPredicate leafPredicate = (LeafPredicate) predicate;
-            return fields.contains(leafPredicate.fieldName());
+            TransformPredicate transformPredicate = (TransformPredicate) 
predicate;
+            return fields.containsAll(transformPredicate.fieldNames());
         }
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
index 3ab5c97e07..1324133c7e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
@@ -32,4 +32,6 @@ public interface Transform extends Serializable {
     DataType outputType();
 
     Object transform(InternalRow row);
+
+    Transform withNewInputs(List<Object> inputs);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
index 266bb517d9..94a1600359 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
@@ -29,6 +29,7 @@ import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -38,20 +39,43 @@ public class TransformPredicate implements Predicate {
 
     private static final long serialVersionUID = 1L;
 
-    private final Transform transform;
-    private final LeafFunction function;
-    private transient List<Object> literals;
+    protected final Transform transform;
+    protected final LeafFunction function;
+    protected transient List<Object> literals;
 
-    public TransformPredicate(Transform transform, LeafFunction function, 
List<Object> literals) {
+    protected TransformPredicate(
+            Transform transform, LeafFunction function, List<Object> literals) 
{
         this.transform = transform;
         this.function = function;
         this.literals = literals;
     }
 
+    public static TransformPredicate of(
+            Transform transform, LeafFunction function, List<Object> literals) 
{
+        if (transform instanceof FieldTransform) {
+            return new LeafPredicate((FieldTransform) transform, function, 
literals);
+        }
+        return new TransformPredicate(transform, function, literals);
+    }
+
     public Transform transform() {
         return transform;
     }
 
+    public TransformPredicate withNewInputs(List<Object> newInputs) {
+        return TransformPredicate.of(transform.withNewInputs(newInputs), 
function, literals);
+    }
+
+    public List<String> fieldNames() {
+        List<String> names = new ArrayList<>();
+        for (Object input : transform.inputs()) {
+            if (input instanceof FieldRef) {
+                names.add(((FieldRef) input).name());
+            }
+        }
+        return names;
+    }
+
     @Override
     public boolean test(InternalRow row) {
         Object value = transform.transform(row);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
index 90d144f8d0..c2f6fa32be 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
@@ -76,6 +76,6 @@ class TransformPredicateTest {
         ConcatTransform transform = new ConcatTransform(inputs);
         List<Object> literals = new ArrayList<>();
         literals.add(BinaryString.fromString("ha-he"));
-        return new TransformPredicate(transform, Equal.INSTANCE, literals);
+        return TransformPredicate.of(transform, Equal.INSTANCE, literals);
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
index f56e7b6864..3f10202607 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.PaimonUtils
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
-import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, 
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
+import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, 
SupportsPushDownV2Filters}
 import org.apache.spark.sql.sources.Filter
 
 import java.util.{List => JList}
@@ -56,7 +56,7 @@ trait PaimonBasePushDown extends SupportsPushDownV2Filters 
with SupportsPushDown
             pushable.append((predicate, paimonPredicate))
             if (paimonPredicate.visit(visitor)) {
               // We need to filter the stats using filter instead of predicate.
-              reserved.append(PaimonUtils.filterV2ToV1(predicate).get)
+              PaimonUtils.filterV2ToV1(predicate).map(reserved.append(_))
             } else {
               postScan.append(predicate)
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 70fffc4d37..d4f0b0cfe0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -27,8 +27,6 @@ import org.apache.paimon.spark.statistics.StatisticsHelper
 import org.apache.paimon.table.{DataTable, InnerTable}
 import org.apache.paimon.table.source.{InnerTableScan, Split}
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil
-import org.apache.paimon.table.system.FilesTable
-import org.apache.paimon.utils.{SnapshotManager, TagManager}
 
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, 
SupportsReportStatistics}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index d3ff6d1a40..8d8d1825ba 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -191,7 +191,8 @@ case class PaimonScan(
     val converter = SparkV2FilterConverter(table.rowType())
     val partitionKeys = table.partitionKeys().asScala.toSeq
     val partitionFilter = predicates.flatMap {
-      case p if SparkV2FilterConverter.isSupportedRuntimeFilter(p, 
partitionKeys) =>
+      case p
+          if 
SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p, 
partitionKeys) =>
         converter.convert(p)
       case _ => None
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index 87a8ddf993..84937e2fdd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -18,21 +18,19 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
-import 
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
-import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType}
-import org.apache.paimon.types.DataTypeRoot._
+import org.apache.paimon.predicate.{FieldTransform, Predicate, 
PredicateBuilder, Transform}
+import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral, 
toPaimonTransform}
+import org.apache.paimon.types.RowType
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.connector.expressions.{Literal, NamedReference}
+import org.apache.spark.sql.connector.expressions.Expression
+import org.apache.spark.sql.connector.expressions.Literal
 import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, 
Predicate => SparkPredicate}
 
 import scala.collection.JavaConverters._
 
 /** Conversion from [[SparkPredicate]] to [[Predicate]]. */
-case class SparkV2FilterConverter(rowType: RowType) {
+case class SparkV2FilterConverter(rowType: RowType) extends Logging {
 
   import org.apache.paimon.spark.SparkV2FilterConverter._
 
@@ -50,85 +48,78 @@ case class SparkV2FilterConverter(rowType: RowType) {
   private def convert(sparkPredicate: SparkPredicate): Predicate = {
     sparkPredicate.name() match {
       case EQUAL_TO =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
             // TODO deal with isNaN
-            val index = fieldIndex(fieldName)
-            builder.equal(index, convertLiteral(index, literal))
+            builder.equal(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case EQUAL_NULL_SAFE =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
             if (literal == null) {
-              builder.isNull(index)
+              builder.isNull(transform)
             } else {
-              builder.equal(index, convertLiteral(index, literal))
+              builder.equal(transform, literal)
             }
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case GREATER_THAN =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.greaterThan(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
+            builder.greaterThan(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case GREATER_THAN_OR_EQUAL =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.greaterOrEqual(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate((transform, literal)) =>
+            builder.greaterOrEqual(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case LESS_THAN =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.lessThan(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
+            builder.lessThan(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case LESS_THAN_OR_EQUAL =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.lessOrEqual(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
+            builder.lessOrEqual(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case IN =>
-        MultiPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literals)) =>
-            val index = fieldIndex(fieldName)
-            builder.in(index, literals.map(convertLiteral(index, 
_)).toList.asJava)
+        sparkPredicate match {
+          case MultiPredicate(transform, literals) =>
+            builder.in(transform, literals.toList.asJava)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case IS_NULL =>
-        UnaryPredicate.unapply(sparkPredicate) match {
-          case Some(fieldName) =>
-            builder.isNull(fieldIndex(fieldName))
+        sparkPredicate match {
+          case UnaryPredicate(transform) =>
+            builder.isNull(transform)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case IS_NOT_NULL =>
-        UnaryPredicate.unapply(sparkPredicate) match {
-          case Some(fieldName) =>
-            builder.isNotNull(fieldIndex(fieldName))
+        sparkPredicate match {
+          case UnaryPredicate(transform) =>
+            builder.isNotNull(transform)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
@@ -151,28 +142,25 @@ case class SparkV2FilterConverter(rowType: RowType) {
         }
 
       case STRING_START_WITH =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.startsWith(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
+            builder.startsWith(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case STRING_END_WITH =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.endsWith(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
+            builder.endsWith(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case STRING_CONTAINS =>
-        BinaryPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, literal)) =>
-            val index = fieldIndex(fieldName)
-            builder.contains(index, convertLiteral(index, literal))
+        sparkPredicate match {
+          case BinaryPredicate(transform, literal) =>
+            builder.contains(transform, literal)
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
@@ -182,106 +170,55 @@ case class SparkV2FilterConverter(rowType: RowType) {
     }
   }
 
-  private def fieldIndex(fieldName: String): Int = {
-    val index = rowType.getFieldIndex(fieldName)
-    // TODO: support nested field
-    if (index == -1) {
-      throw new UnsupportedOperationException(s"Nested field '$fieldName' is 
unsupported.")
-    }
-    index
-  }
-
-  private def convertLiteral(index: Int, value: Any): AnyRef = {
-    if (value == null) {
-      return null
-    }
-
-    val dataType = rowType.getTypeAt(index)
-    dataType.getTypeRoot match {
-      case BOOLEAN | BIGINT | DOUBLE | TINYINT | SMALLINT | INTEGER | FLOAT | 
DATE =>
-        value.asInstanceOf[AnyRef]
-      case DataTypeRoot.VARCHAR =>
-        BinaryString.fromString(value.toString)
-      case DataTypeRoot.DECIMAL =>
-        val decimalType = dataType.asInstanceOf[DecimalType]
-        val precision = decimalType.getPrecision
-        val scale = decimalType.getScale
-        Decimal.fromBigDecimal(
-          
value.asInstanceOf[org.apache.spark.sql.types.Decimal].toJavaBigDecimal,
-          precision,
-          scale)
-      case DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
-        Timestamp.fromMicros(value.asInstanceOf[Long])
-      case DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
-        if (treatPaimonTimestampTypeAsSparkTimestampType()) {
-          
Timestamp.fromSQLTimestamp(DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]))
-        } else {
-          Timestamp.fromMicros(value.asInstanceOf[Long])
-        }
-      case _ =>
-        throw new UnsupportedOperationException(
-          s"Convert value: $value to datatype: $dataType is unsupported.")
-    }
-  }
-}
-
-object SparkV2FilterConverter extends Logging {
-
-  private val EQUAL_TO = "="
-  private val EQUAL_NULL_SAFE = "<=>"
-  private val GREATER_THAN = ">"
-  private val GREATER_THAN_OR_EQUAL = ">="
-  private val LESS_THAN = "<"
-  private val LESS_THAN_OR_EQUAL = "<="
-  private val IN = "IN"
-  private val IS_NULL = "IS_NULL"
-  private val IS_NOT_NULL = "IS_NOT_NULL"
-  private val AND = "AND"
-  private val OR = "OR"
-  private val NOT = "NOT"
-  private val STRING_START_WITH = "STARTS_WITH"
-  private val STRING_END_WITH = "ENDS_WITH"
-  private val STRING_CONTAINS = "CONTAINS"
-
   private object UnaryPredicate {
-    def unapply(sparkPredicate: SparkPredicate): Option[String] = {
+    def unapply(sparkPredicate: SparkPredicate): Option[Transform] = {
       sparkPredicate.children() match {
-        case Array(n: NamedReference) => Some(toFieldName(n))
+        case Array(e: Expression) => toPaimonTransform(e, rowType)
         case _ => None
       }
     }
   }
 
   private object BinaryPredicate {
-    def unapply(sparkPredicate: SparkPredicate): Option[(String, Any)] = {
+    def unapply(sparkPredicate: SparkPredicate): Option[(Transform, Object)] = 
{
       sparkPredicate.children() match {
-        case Array(l: NamedReference, r: Literal[_]) => Some((toFieldName(l), 
r.value))
-        case Array(l: Literal[_], r: NamedReference) => Some((toFieldName(r), 
l.value))
+        case Array(e: Expression, r: Literal[_]) =>
+          toPaimonTransform(e, rowType) match {
+            case Some(transform) => Some(transform, toPaimonLiteral(r))
+            case _ => None
+          }
         case _ => None
       }
     }
   }
 
   private object MultiPredicate {
-    def unapply(sparkPredicate: SparkPredicate): Option[(String, Array[Any])] 
= {
+    def unapply(sparkPredicate: SparkPredicate): Option[(Transform, 
Seq[Object])] = {
       sparkPredicate.children() match {
-        case Array(first: NamedReference, rest @ _*)
+        case Array(e: Expression, rest @ _*)
             if rest.nonEmpty && rest.forall(_.isInstanceOf[Literal[_]]) =>
-          Some(toFieldName(first), 
rest.map(_.asInstanceOf[Literal[_]].value).toArray)
+          val literals = rest.map(_.asInstanceOf[Literal[_]])
+          if (literals.forall(_.dataType() == literals.head.dataType())) {
+            toPaimonTransform(e, rowType) match {
+              case Some(transform) => Some(transform, 
literals.map(toPaimonLiteral))
+              case _ => None
+            }
+          } else {
+            None
+          }
         case _ => None
       }
     }
   }
 
-  private def toFieldName(ref: NamedReference): String = 
ref.fieldNames().mkString(".")
-
   def isSupportedRuntimeFilter(
       sparkPredicate: SparkPredicate,
       partitionKeys: Seq[String]): Boolean = {
     sparkPredicate.name() match {
       case IN =>
-        MultiPredicate.unapply(sparkPredicate) match {
-          case Some((fieldName, _)) => partitionKeys.contains(fieldName)
+        sparkPredicate match {
+          case MultiPredicate(transform: FieldTransform, _) =>
+            partitionKeys.contains(transform.fieldRef().name())
           case _ =>
             logWarning(s"Convert $sparkPredicate is unsupported.")
             false
@@ -290,3 +227,23 @@ object SparkV2FilterConverter extends Logging {
     }
   }
 }
+
+object SparkV2FilterConverter extends Logging {
+
+  private val EQUAL_TO = "="
+  private val EQUAL_NULL_SAFE = "<=>"
+  private val GREATER_THAN = ">"
+  private val GREATER_THAN_OR_EQUAL = ">="
+  private val LESS_THAN = "<"
+  private val LESS_THAN_OR_EQUAL = "<="
+  private val IN = "IN"
+  private val IS_NULL = "IS_NULL"
+  private val IS_NOT_NULL = "IS_NOT_NULL"
+  private val AND = "AND"
+  private val OR = "OR"
+  private val NOT = "NOT"
+  private val STRING_START_WITH = "STARTS_WITH"
+  private val STRING_END_WITH = "ENDS_WITH"
+  private val STRING_CONTAINS = "CONTAINS"
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 8bc976b866..2bed4af873 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -26,7 +26,6 @@ import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilterV2}
-import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, 
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
new file mode 100644
index 0000000000..b32436caba
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
+import org.apache.paimon.predicate.{ConcatTransform, FieldRef, FieldTransform, 
Transform}
+import org.apache.paimon.spark.SparkTypeUtils
+import 
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
+import org.apache.paimon.types.{DecimalType, RowType}
+import org.apache.paimon.types.DataTypeRoot._
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
+
+import scala.collection.JavaConverters._
+
+object SparkExpressionConverter {
+
+  // Supported transform names
+  private val CONCAT = "CONCAT"
+
+  /** Convert Spark [[Expression]] to Paimon [[Transform]], return None if not 
supported. */
+  def toPaimonTransform(exp: Expression, rowType: RowType): Option[Transform] 
= {
+    exp match {
+      case n: NamedReference => Some(new FieldTransform(toPaimonFieldRef(n, 
rowType)))
+      case s: GeneralScalarExpression =>
+        s.name() match {
+          case CONCAT =>
+            val inputs = exp.children().map {
+              case n: NamedReference => toPaimonFieldRef(n, rowType)
+              case l: Literal[_] => toPaimonLiteral(l)
+              case _ => return None
+            }
+            Some(new ConcatTransform(inputs.toList.asJava))
+          case _ => None
+        }
+      case _ => None
+    }
+  }
+
+  /** Convert Spark [[Literal]] to Paimon literal. */
+  def toPaimonLiteral(literal: Literal[_]): Object = {
+    if (literal == null) {
+      return null
+    }
+
+    if (literal.children().nonEmpty) {
+      throw new UnsupportedOperationException(s"Convert value: $literal is 
unsupported.")
+    }
+
+    val dataType = SparkTypeUtils.toPaimonType(literal.dataType())
+    val value = literal.value()
+    dataType.getTypeRoot match {
+      case BOOLEAN | BIGINT | DOUBLE | TINYINT | SMALLINT | INTEGER | FLOAT | 
DATE =>
+        value.asInstanceOf[AnyRef]
+      case VARCHAR =>
+        BinaryString.fromString(value.toString)
+      case DECIMAL =>
+        val decimalType = dataType.asInstanceOf[DecimalType]
+        val precision = decimalType.getPrecision
+        val scale = decimalType.getScale
+        Decimal.fromBigDecimal(
+          
value.asInstanceOf[org.apache.spark.sql.types.Decimal].toJavaBigDecimal,
+          precision,
+          scale)
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+        Timestamp.fromMicros(value.asInstanceOf[Long])
+      case TIMESTAMP_WITHOUT_TIME_ZONE =>
+        if (treatPaimonTimestampTypeAsSparkTimestampType()) {
+          
Timestamp.fromSQLTimestamp(DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]))
+        } else {
+          Timestamp.fromMicros(value.asInstanceOf[Long])
+        }
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Convert value: $value to datatype: $dataType is unsupported.")
+    }
+  }
+
+  private def toPaimonFieldRef(ref: NamedReference, rowType: RowType): 
FieldRef = {
+    val fieldName = toFieldName(ref)
+    val f = rowType.getField(fieldName)
+    // Note: here should use fieldIndex instead of fieldId
+    val index = rowType.getFieldIndex(fieldName)
+    if (index == -1) {
+      throw new UnsupportedOperationException(s"Nested field '$fieldName' is 
unsupported.")
+    }
+    new FieldRef(index, f.name(), f.`type`())
+  }
+
+  private def toFieldName(ref: NamedReference): String = 
ref.fieldNames().mkString(".")
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
index f330fed3f3..94cbe0d00c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.connector.catalog
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.{PaimonSparkSession, SparkSession}
+import org.apache.spark.sql.PaimonSparkSession
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
-import org.apache.spark.sql.connector.catalog.CatalogV2Util
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.paimon.ReflectUtils
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 8759f99f00..5a4ff36b6c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -87,6 +87,48 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, 
"c", "p2") :: Nil)
   }
 
+  test(s"Paimon push down: apply CONCAT") {
+    // Spark support push down CONCAT since Spark 3.4.
+    if (gteqSpark3_4) {
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t (id int, value int, year STRING, month STRING, day 
STRING, hour STRING)
+            |using paimon
+            |PARTITIONED BY (year, month, day, hour)
+            |""".stripMargin)
+
+        sql("""
+              |INSERT INTO t values
+              |(1, 100, '2024', '07', '15', '21'),
+              |(2, 200, '2025', '07', '15', '21'),
+              |(3, 300, '2025', '07', '16', '22'),
+              |(4, 400, '2025', '07', '16', '23'),
+              |(5, 440, '2025', '07', '16', '23'),
+              |(6, 500, '2025', '07', '17', '00'),
+              |(7, 600, '2025', '07', '17', '02')
+              |""".stripMargin)
+
+        val q =
+          """
+            |SELECT * FROM t
+            |WHERE CONCAT(year,'-',month,'-',day,'-',hour) BETWEEN 
'2025-07-16-21' AND '2025-07-17-01'
+            |ORDER BY id
+            |""".stripMargin
+        assert(!checkFilterExists(q))
+
+        checkAnswer(
+          spark.sql(q),
+          Seq(
+            Row(3, 300, "2025", "07", "16", "22"),
+            Row(4, 400, "2025", "07", "16", "23"),
+            Row(5, 440, "2025", "07", "16", "23"),
+            Row(6, 500, "2025", "07", "17", "00"))
+        )
+      }
+    }
+  }
+
   test("Paimon pushDown: limit for append-only tables with deletion vector") {
     withTable("dv_test") {
       spark.sql(

Reply via email to