This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dbd129d3c7 [spark] Introduce SparkV2FilterConverter (#4915)
dbd129d3c7 is described below

commit dbd129d3c7c6a7c39210ab47efc8c1275c7d7ce6
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 17 10:59:37 2025 +0800

    [spark] Introduce SparkV2FilterConverter (#4915)
---
 .../spark/sql/SparkV2FilterConverterTest.scala     |  21 ++
 .../spark/sql/SparkV2FilterConverterTest.scala     |  21 ++
 .../spark/sql/SparkV2FilterConverterTest.scala     |  21 ++
 .../spark/sql/SparkV2FilterConverterTest.scala     |  21 ++
 .../apache/paimon/spark/SparkFilterConverter.java  |   4 +-
 .../paimon/spark/SparkV2FilterConverter.scala      | 236 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  16 +-
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |   1 +
 .../spark/sql/SparkV2FilterConverterTestBase.scala | 224 +++++++++++++++++++
 9 files changed, 560 insertions(+), 5 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
index 6b9375e556..2050c937c6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
@@ -105,10 +105,10 @@ public class SparkFilterConverter {
             return builder.equal(index, literal);
         } else if (filter instanceof EqualNullSafe) {
             EqualNullSafe eq = (EqualNullSafe) filter;
+            int index = fieldIndex(eq.attribute());
             if (eq.value() == null) {
-                return builder.isNull(fieldIndex(eq.attribute()));
+                return builder.isNull(index);
             } else {
-                int index = fieldIndex(eq.attribute());
                 Object literal = convertLiteral(index, eq.value());
                 return builder.equal(index, literal);
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
new file mode 100644
index 0000000000..11ef302672
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType}
+import org.apache.paimon.types.DataTypeRoot._
+
+import org.apache.spark.sql.connector.expressions.{Literal, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, 
Predicate => SparkPredicate}
+
+import scala.collection.JavaConverters._
+
+/** Conversion from [[SparkPredicate]] to [[Predicate]]. */
+case class SparkV2FilterConverter(rowType: RowType) {
+
+  import org.apache.paimon.spark.SparkV2FilterConverter._
+
+  val builder = new PredicateBuilder(rowType)
+
+  def convert(sparkPredicate: SparkPredicate): Predicate = {
+    sparkPredicate.name() match {
+      case EQUAL_TO =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            // TODO deal with isNaN
+            val index = fieldIndex(fieldName)
+            builder.equal(index, convertLiteral(index, literal))
+        }
+
+      case EQUAL_NULL_SAFE =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            if (literal == null) {
+              builder.isNull(index)
+            } else {
+              builder.equal(index, convertLiteral(index, literal))
+            }
+        }
+
+      case GREATER_THAN =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.greaterThan(index, convertLiteral(index, literal))
+        }
+
+      case GREATER_THAN_OR_EQUAL =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.greaterOrEqual(index, convertLiteral(index, literal))
+        }
+
+      case LESS_THAN =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.lessThan(index, convertLiteral(index, literal))
+        }
+
+      case LESS_THAN_OR_EQUAL =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.lessOrEqual(index, convertLiteral(index, literal))
+        }
+
+      case IN =>
+        MultiPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literals)) =>
+            val index = fieldIndex(fieldName)
+            literals.map(convertLiteral(index, _)).toList.asJava
+            builder.in(index, literals.map(convertLiteral(index, 
_)).toList.asJava)
+        }
+
+      case IS_NULL =>
+        UnaryPredicate.unapply(sparkPredicate) match {
+          case Some(fieldName) =>
+            builder.isNull(fieldIndex(fieldName))
+        }
+
+      case IS_NOT_NULL =>
+        UnaryPredicate.unapply(sparkPredicate) match {
+          case Some(fieldName) =>
+            builder.isNotNull(fieldIndex(fieldName))
+        }
+
+      case AND =>
+        val and = sparkPredicate.asInstanceOf[And]
+        PredicateBuilder.and(convert(and.left), convert(and.right()))
+
+      case OR =>
+        val or = sparkPredicate.asInstanceOf[Or]
+        PredicateBuilder.or(convert(or.left), convert(or.right()))
+
+      case NOT =>
+        val not = sparkPredicate.asInstanceOf[Not]
+        val negate = convert(not.child()).negate()
+        if (negate.isPresent) {
+          negate.get()
+        } else {
+          throw new UnsupportedOperationException(s"Convert $sparkPredicate is 
unsupported.")
+        }
+
+      case STRING_START_WITH =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.startsWith(index, convertLiteral(index, literal))
+        }
+
+      case STRING_END_WITH =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.endsWith(index, convertLiteral(index, literal))
+        }
+
+      case STRING_CONTAINS =>
+        BinaryPredicate.unapply(sparkPredicate) match {
+          case Some((fieldName, literal)) =>
+            val index = fieldIndex(fieldName)
+            builder.contains(index, convertLiteral(index, literal))
+        }
+
+      // TODO: AlwaysTrue, AlwaysFalse
+      case _ => throw new UnsupportedOperationException(s"Convert 
$sparkPredicate is unsupported.")
+    }
+  }
+
+  private object UnaryPredicate {
+    def unapply(sparkPredicate: SparkPredicate): Option[String] = {
+      sparkPredicate.children() match {
+        case Array(n: NamedReference) => Some(toFieldName(n))
+        case _ => None
+      }
+    }
+  }
+
+  private object BinaryPredicate {
+    def unapply(sparkPredicate: SparkPredicate): Option[(String, Any)] = {
+      sparkPredicate.children() match {
+        case Array(l: NamedReference, r: Literal[_]) => Some((toFieldName(l), 
r.value))
+        case Array(l: Literal[_], r: NamedReference) => Some((toFieldName(r), 
l.value))
+        case _ => None
+      }
+    }
+  }
+
+  private object MultiPredicate {
+    def unapply(sparkPredicate: SparkPredicate): Option[(String, Array[Any])] 
= {
+      sparkPredicate.children() match {
+        case Array(first: NamedReference, rest @ _*)
+            if rest.nonEmpty && rest.forall(_.isInstanceOf[Literal[_]]) =>
+          Some(toFieldName(first), 
rest.map(_.asInstanceOf[Literal[_]].value).toArray)
+        case _ => None
+      }
+    }
+  }
+
+  private def fieldIndex(fieldName: String): Int = {
+    val index = rowType.getFieldIndex(fieldName)
+    // TODO: support nested field
+    if (index == -1) {
+      throw new UnsupportedOperationException(s"Nested field '$fieldName' is 
unsupported.")
+    }
+    index
+  }
+
+  private def convertLiteral(index: Int, value: Any): AnyRef = {
+    if (value == null) {
+      return null
+    }
+
+    val dataType = rowType.getTypeAt(index)
+    dataType.getTypeRoot match {
+      case BOOLEAN | BIGINT | DOUBLE | TINYINT | SMALLINT | INTEGER | FLOAT | 
DATE =>
+        value.asInstanceOf[AnyRef]
+      case DataTypeRoot.VARCHAR =>
+        BinaryString.fromString(value.toString)
+      case DataTypeRoot.DECIMAL =>
+        val decimalType = dataType.asInstanceOf[DecimalType]
+        val precision = decimalType.getPrecision
+        val scale = decimalType.getScale
+        Decimal.fromBigDecimal(
+          
value.asInstanceOf[org.apache.spark.sql.types.Decimal].toJavaBigDecimal,
+          precision,
+          scale)
+      case DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE | 
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
+        Timestamp.fromMicros(value.asInstanceOf[Long])
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Convert value: $value to datatype: $dataType is unsupported.")
+    }
+  }
+
+  private def toFieldName(ref: NamedReference): String = 
ref.fieldNames().mkString(".")
+}
+
+object SparkV2FilterConverter {
+
+  private val EQUAL_TO = "="
+  private val EQUAL_NULL_SAFE = "<=>"
+  private val GREATER_THAN = ">"
+  private val GREATER_THAN_OR_EQUAL = ">="
+  private val LESS_THAN = "<"
+  private val LESS_THAN_OR_EQUAL = "<="
+  private val IN = "IN"
+  private val IS_NULL = "IS_NULL"
+  private val IS_NOT_NULL = "IS_NOT_NULL"
+  private val AND = "AND"
+  private val OR = "OR"
+  private val NOT = "NOT"
+  private val STRING_START_WITH = "STARTS_WITH"
+  private val STRING_END_WITH = "ENDS_WITH"
+  private val STRING_CONTAINS = "CONTAINS"
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index cc49e787dc..d01a840f8e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
+import org.apache.spark.sql.connector.expressions.FieldReference
+import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.PartitioningUtils
@@ -68,8 +70,16 @@ object PaimonUtils {
     DataSourceStrategy.translateFilter(predicate, 
supportNestedPredicatePushdown)
   }
 
-  def fieldReference(name: String): NamedReference = {
-    FieldReference(Seq(name))
+  def translateFilterV2(predicate: Expression): Option[Predicate] = {
+    translateFilterV2WithMapping(predicate, None)
+  }
+
+  def fieldReference(name: String): FieldReference = {
+    fieldReference(Seq(name))
+  }
+
+  def fieldReference(parts: Seq[String]): FieldReference = {
+    FieldReference(parts)
   }
 
   def bytesToString(size: Long): String = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 9a1647da81..47f0c5a7d3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -81,6 +81,7 @@ class PaimonSparkTestBase
     super.beforeAll()
     spark.sql(s"USE paimon")
     spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$dbName0")
+    spark.sql(s"USE paimon.$dbName0")
   }
 
   override protected def afterAll(): Unit = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
new file mode 100644
index 0000000000..b9cbc29b3a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
+import org.apache.paimon.predicate.PredicateBuilder
+import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter}
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.PaimonUtils.translateFilterV2
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+
+import java.time.{LocalDate, LocalDateTime}
+
+import scala.collection.JavaConverters._
+
+/** Test for [[SparkV2FilterConverter]]. */
+abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase {
+
+  // Add it to disable the automatic generation of the not null filter which 
impact test.
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.constraintPropagation.enabled", "false")
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    sql("""
+          |CREATE TABLE test_tbl (
+          | string_col STRING,
+          | byte_col BYTE,
+          | short_col SHORT,
+          | int_col INT,
+          | long_col LONG,
+          | float_col FLOAT,
+          | double_col DOUBLE,
+          | decimal_col DECIMAL(10, 5),
+          | boolean_col BOOLEAN,
+          | date_col DATE,
+          | binary BINARY
+          |) USING paimon
+          |""".stripMargin)
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS test_tbl")
+    super.afterAll()
+  }
+
+  lazy val rowType: RowType = loadTable("test_tbl").rowType()
+
+  lazy val builder = new PredicateBuilder(rowType)
+
+  lazy val converter: SparkV2FilterConverter = SparkV2FilterConverter(rowType)
+
+  test("V2Filter: all types") {
+    var actual = converter.convert(v2Filter("string_col = 'hello'"))
+    assert(actual.equals(builder.equal(0, BinaryString.fromString("hello"))))
+
+    actual = converter.convert(v2Filter("byte_col = 1"))
+    assert(actual.equals(builder.equal(1, 1.toByte)))
+
+    actual = converter.convert(v2Filter("short_col = 1"))
+    assert(actual.equals(builder.equal(2, 1.toShort)))
+
+    actual = converter.convert(v2Filter("int_col = 1"))
+    assert(actual.equals(builder.equal(3, 1)))
+
+    actual = converter.convert(v2Filter("long_col = 1"))
+    assert(actual.equals(builder.equal(4, 1L)))
+
+    actual = converter.convert(v2Filter("float_col = 1.0"))
+    assert(actual.equals(builder.equal(5, 1.0f)))
+
+    actual = converter.convert(v2Filter("double_col = 1.0"))
+    assert(actual.equals(builder.equal(6, 1.0d)))
+
+    actual = converter.convert(v2Filter("decimal_col = 12.12345"))
+    assert(
+      actual.equals(
+        builder.equal(7, Decimal.fromBigDecimal(new 
java.math.BigDecimal("12.12345"), 10, 5))))
+
+    actual = converter.convert(v2Filter("boolean_col = true"))
+    assert(actual.equals(builder.equal(8, true)))
+
+    actual = converter.convert(v2Filter("date_col = cast('2025-01-15' as 
date)"))
+    val localDate = LocalDate.parse("2025-01-15")
+    val epochDay = localDate.toEpochDay.toInt
+    assert(actual.equals(builder.equal(9, epochDay)))
+
+    intercept[UnsupportedOperationException] {
+      actual = converter.convert(v2Filter("binary = binary('b1')"))
+    }
+  }
+
+  test("V2Filter: timestamp and timestamp_ntz") {
+    withTimeZone("Asia/Shanghai") {
+      withTable("ts_tbl", "ts_ntz_tbl") {
+        sql("CREATE TABLE ts_tbl (ts_col TIMESTAMP) USING paimon")
+        val rowType1 = loadTable("ts_tbl").rowType()
+        val converter1 = SparkV2FilterConverter(rowType1)
+        val actual1 =
+          converter1.convert(v2Filter("ts_col = timestamp'2025-01-15 
00:00:00.123'", "ts_tbl"))
+        assert(
+          actual1.equals(new PredicateBuilder(rowType1)
+            .equal(0, 
Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-14T16:00:00.123")))))
+
+        // Spark support TIMESTAMP_NTZ since Spark 3.4
+        if (gteqSpark3_4) {
+          sql("CREATE TABLE ts_ntz_tbl (ts_ntz_col TIMESTAMP_NTZ) USING 
paimon")
+          val rowType2 = loadTable("ts_ntz_tbl").rowType()
+          val converter2 = SparkV2FilterConverter(rowType2)
+          val actual2 = converter2.convert(
+            v2Filter("ts_ntz_col = timestamp_ntz'2025-01-15 00:00:00.123'", 
"ts_ntz_tbl"))
+          assert(actual2.equals(new PredicateBuilder(rowType2)
+            .equal(0, 
Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-15T00:00:00.123")))))
+        }
+      }
+    }
+  }
+
+  test("V2Filter: EqualTo") {
+    val actual = converter.convert(v2Filter("int_col = 1"))
+    assert(actual.equals(builder.equal(3, 1)))
+  }
+
+  test("V2Filter: EqualNullSafe") {
+    var actual = converter.convert(v2Filter("int_col <=> 1"))
+    assert(actual.equals(builder.equal(3, 1)))
+
+    actual = converter.convert(v2Filter("int_col <=> null"))
+    assert(actual.equals(builder.isNull(3)))
+  }
+
+  test("V2Filter: GreaterThan") {
+    val actual = converter.convert(v2Filter("int_col > 1"))
+    assert(actual.equals(builder.greaterThan(3, 1)))
+  }
+
+  test("V2Filter: GreaterThanOrEqual") {
+    val actual = converter.convert(v2Filter("int_col >= 1"))
+    assert(actual.equals(builder.greaterOrEqual(3, 1)))
+  }
+
+  test("V2Filter: LessThan") {
+    val actual = converter.convert(v2Filter("int_col < 1"))
+    assert(actual.equals(builder.lessThan(3, 1)))
+  }
+
+  test("V2Filter: LessThanOrEqual") {
+    val actual = converter.convert(v2Filter("int_col <= 1"))
+    assert(actual.equals(builder.lessOrEqual(3, 1)))
+  }
+
+  test("V2Filter: In") {
+    val actual = converter.convert(v2Filter("int_col IN (1, 2, 3)"))
+    assert(actual.equals(builder.in(3, List(1, 2, 
3).map(_.asInstanceOf[AnyRef]).asJava)))
+  }
+
+  test("V2Filter: IsNull") {
+    val actual = converter.convert(v2Filter("int_col IS NULL"))
+    assert(actual.equals(builder.isNull(3)))
+  }
+
+  test("V2Filter: IsNotNull") {
+    val actual = converter.convert(v2Filter("int_col IS NOT NULL"))
+    assert(actual.equals(builder.isNotNull(3)))
+  }
+
+  test("V2Filter: And") {
+    val actual = converter.convert(v2Filter("int_col > 1 AND int_col < 10"))
+    assert(actual.equals(PredicateBuilder.and(builder.greaterThan(3, 1), 
builder.lessThan(3, 10))))
+  }
+
+  test("V2Filter: Or") {
+    val actual = converter.convert(v2Filter("int_col > 1 OR int_col < 10"))
+    assert(actual.equals(PredicateBuilder.or(builder.greaterThan(3, 1), 
builder.lessThan(3, 10))))
+  }
+
+  test("V2Filter: Not") {
+    val actual = converter.convert(v2Filter("NOT (int_col > 1)"))
+    assert(actual.equals(builder.greaterThan(3, 1).negate().get()))
+  }
+
+  test("V2Filter: StartWith") {
+    val actual = converter.convert(v2Filter("string_col LIKE 'h%'"))
+    assert(actual.equals(builder.startsWith(0, BinaryString.fromString("h"))))
+  }
+
+  test("V2Filter: EndWith") {
+    val actual = converter.convert(v2Filter("string_col LIKE '%o'"))
+    assert(actual.equals(builder.endsWith(0, BinaryString.fromString("o"))))
+  }
+
+  test("V2Filter: Contains") {
+    val actual = converter.convert(v2Filter("string_col LIKE '%e%'"))
+    assert(actual.equals(builder.contains(0, BinaryString.fromString("e"))))
+  }
+
+  private def v2Filter(str: String, tableName: String = "test_tbl"): Predicate 
= {
+    val condition = sql(s"SELECT * FROM $tableName WHERE 
$str").queryExecution.optimizedPlan
+      .collectFirst { case f: Filter => f }
+      .get
+      .condition
+    translateFilterV2(condition).get
+  }
+}

Reply via email to