This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ae696159 [core][spark] Introduce cast transform (#6581)
69ae696159 is described below

commit 69ae69615902436d9ad333593f4c488125cb8c12
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Nov 11 10:51:03 2025 +0800

    [core][spark] Introduce cast transform (#6581)
---
 .../org/apache/paimon/predicate/CastTransform.java | 112 +++++++++++++++++++++
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |  10 +-
 .../spark/util/SparkExpressionConverter.scala      |  48 +++++----
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  23 +++++
 4 files changed, 174 insertions(+), 19 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/CastTransform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/CastTransform.java
new file mode 100644
index 0000000000..37141011f3
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/CastTransform.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.InternalRowUtils.get;
+
+/** Transform that casts a field to a new type. */
+public class CastTransform implements Transform {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FieldRef fieldRef;
+    private final DataType type;
+    private transient CastExecutor<Object, Object> cast;
+
+    private CastTransform(FieldRef fieldRef, DataType type, 
CastExecutor<Object, Object> cast) {
+        this.fieldRef = fieldRef;
+        this.type = type;
+        this.cast = cast;
+    }
+
+    public static Optional<Transform> tryCreate(FieldRef fieldRef, DataType 
type) {
+        if (fieldRef.type().equals(type)) {
+            return Optional.of(new FieldTransform(fieldRef));
+        }
+
+        @SuppressWarnings("unchecked")
+        CastExecutor<Object, Object> cast =
+                (CastExecutor<Object, Object>) 
CastExecutors.resolve(fieldRef.type(), type);
+        if (cast == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new CastTransform(fieldRef, type, cast));
+        }
+    }
+
+    @Override
+    public List<Object> inputs() {
+        return Collections.singletonList(fieldRef);
+    }
+
+    @Override
+    public DataType outputType() {
+        return type;
+    }
+
+    @Override
+    public Object transform(InternalRow row) {
+        return cast.cast(get(row, fieldRef.index(), fieldRef.type()));
+    }
+
+    @Override
+    public Transform copyWithNewInputs(List<Object> inputs) {
+        assert inputs.size() == 1;
+        return new CastTransform((FieldRef) inputs.get(0), type, cast);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CastTransform that = (CastTransform) o;
+        return Objects.equals(fieldRef, that.fieldRef) && Objects.equals(type, 
that.type);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fieldRef, type);
+    }
+
+    @Override
+    public String toString() {
+        return "CAST( " + fieldRef + " AS " + type + ")";
+    }
+
+    private void readObject(java.io.ObjectInputStream in)
+            throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        @SuppressWarnings("unchecked")
+        CastExecutor<Object, Object> resolved =
+                (CastExecutor<Object, Object>) 
CastExecutors.resolve(fieldRef.type(), type);
+        this.cast = resolved;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index d4f0b0cfe0..f2a12f1b41 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -128,6 +128,13 @@ abstract class PaimonBaseScan(
     } else {
       ""
     }
+
+    val reservedFiltersStr = if (reservedFilters.nonEmpty) {
+      ", ReservedFilters: [" + reservedFilters.mkString(",") + "]"
+    } else {
+      ""
+    }
+
     val pushedTopNFilterStr = if (pushDownTopN.nonEmpty) {
       s", PushedTopNFilter: [${pushDownTopN.get.toString}]"
     } else {
@@ -169,7 +176,8 @@ abstract class PaimonBaseScan(
       ""
     }
 
-    s"PaimonScan: [${table.name}]" + latestSnapshotIdStr + 
currentSnapshotIdStr + pushedFiltersStr + pushedTopNFilterStr +
+    s"PaimonScan: [${table.name}]" + latestSnapshotIdStr + 
currentSnapshotIdStr +
+      pushedFiltersStr + reservedFiltersStr + pushedTopNFilterStr +
       pushDownLimit.map(limit => s", Limit: [$limit]").getOrElse("")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
index 71bbea254c..32409544b6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
@@ -19,43 +19,55 @@
 package org.apache.paimon.spark.util
 
 import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
-import org.apache.paimon.predicate.{ConcatTransform, FieldRef, FieldTransform, 
Transform, UpperTransform}
-import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.predicate._
+import org.apache.paimon.spark.{PaimonImplicits, SparkTypeUtils}
 import 
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
 import org.apache.paimon.types.{DecimalType, RowType}
 import org.apache.paimon.types.DataTypeRoot._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.connector.expressions.{Expression, 
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.connector.expressions.{Cast, Expression, 
GeneralScalarExpression, Literal, NamedReference}
 
 import scala.collection.JavaConverters._
 
 object SparkExpressionConverter {
 
-  // Supported transform names
+  import PaimonImplicits._
+
+  // Supported general scalar transform names
   private val CONCAT = "CONCAT"
   private val UPPER = "UPPER"
 
   /** Convert Spark [[Expression]] to Paimon [[Transform]], return None if not 
supported. */
   def toPaimonTransform(exp: Expression, rowType: RowType): Option[Transform] 
= {
+
+    def convertChildren(children: Seq[Expression]) = {
+      val converted = children.map {
+        case n: NamedReference => Some(toPaimonFieldRef(n, rowType))
+        case l: Literal[_] => Some(toPaimonLiteral(l))
+        case _ => None
+      }
+      if (converted.exists(_.isEmpty)) {
+        None
+      } else {
+        Some(converted.map(_.get).asJava)
+      }
+    }
+
     exp match {
       case n: NamedReference => Some(new FieldTransform(toPaimonFieldRef(n, 
rowType)))
       case s: GeneralScalarExpression =>
         s.name() match {
-          case CONCAT =>
-            val inputs = exp.children().map {
-              case n: NamedReference => toPaimonFieldRef(n, rowType)
-              case l: Literal[_] => toPaimonLiteral(l)
-              case _ => return None
-            }
-            Some(new ConcatTransform(inputs.toList.asJava))
-          case UPPER =>
-            val inputs = exp.children().map {
-              case n: NamedReference => toPaimonFieldRef(n, rowType)
-              case l: Literal[_] => toPaimonLiteral(l)
-              case _ => return None
-            }
-            Some(new UpperTransform(inputs.toList.asJava))
+          case CONCAT => convertChildren(s.children()).map(i => new 
ConcatTransform(i))
+          case UPPER => convertChildren(s.children()).map(i => new 
UpperTransform(i))
+          case _ => None
+        }
+      case c: Cast =>
+        c.expression() match {
+          case n: NamedReference =>
+            CastTransform.tryCreate(
+              toPaimonFieldRef(n, rowType),
+              SparkTypeUtils.toPaimonType(c.dataType()))
           case _ => None
         }
       case _ => None
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index be2bf18bef..c23694e1d1 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -159,6 +159,29 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test(s"Paimon push down: apply CAST") {
+    if (gteqSpark3_4) {
+      withSparkSQLConf("spark.sql.ansi.enabled" -> "true") {
+        withTable("t") {
+          sql("""
+                |CREATE TABLE t (id int, value int, dt STRING)
+                |using paimon
+                |PARTITIONED BY (dt)
+                |""".stripMargin)
+
+          sql("""
+                |INSERT INTO t values
+                |(1, 100, '1')
+                |""".stripMargin)
+
+          val q = "SELECT * FROM t WHERE dt = 1"
+          assert(!checkFilterExists(q))
+          checkAnswer(sql(q), Seq(Row(1, 100, "1")))
+        }
+      }
+    }
+  }
+
   test("Paimon pushDown: limit for append-only tables with deletion vector") {
     withTable("dv_test") {
       spark.sql(

Reply via email to