This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dbd129d3c7 [spark] Introduce SparkV2FilterConverter (#4915)
dbd129d3c7 is described below
commit dbd129d3c7c6a7c39210ab47efc8c1275c7d7ce6
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 17 10:59:37 2025 +0800
[spark] Introduce SparkV2FilterConverter (#4915)
---
.../spark/sql/SparkV2FilterConverterTest.scala | 21 ++
.../spark/sql/SparkV2FilterConverterTest.scala | 21 ++
.../spark/sql/SparkV2FilterConverterTest.scala | 21 ++
.../spark/sql/SparkV2FilterConverterTest.scala | 21 ++
.../apache/paimon/spark/SparkFilterConverter.java | 4 +-
.../paimon/spark/SparkV2FilterConverter.scala | 236 +++++++++++++++++++++
.../scala/org/apache/spark/sql/PaimonUtils.scala | 16 +-
.../apache/paimon/spark/PaimonSparkTestBase.scala | 1 +
.../spark/sql/SparkV2FilterConverterTestBase.scala | 224 +++++++++++++++++++
9 files changed, 560 insertions(+), 5 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
new file mode 100644
index 0000000000..21c4c8a495
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class SparkV2FilterConverterTest extends SparkV2FilterConverterTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
index 6b9375e556..2050c937c6 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
@@ -105,10 +105,10 @@ public class SparkFilterConverter {
return builder.equal(index, literal);
} else if (filter instanceof EqualNullSafe) {
EqualNullSafe eq = (EqualNullSafe) filter;
+ int index = fieldIndex(eq.attribute());
if (eq.value() == null) {
- return builder.isNull(fieldIndex(eq.attribute()));
+ return builder.isNull(index);
} else {
- int index = fieldIndex(eq.attribute());
Object literal = convertLiteral(index, eq.value());
return builder.equal(index, literal);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
new file mode 100644
index 0000000000..11ef302672
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType}
+import org.apache.paimon.types.DataTypeRoot._
+
+import org.apache.spark.sql.connector.expressions.{Literal, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or,
Predicate => SparkPredicate}
+
+import scala.collection.JavaConverters._
+
+/** Conversion from [[SparkPredicate]] to [[Predicate]]. */
+case class SparkV2FilterConverter(rowType: RowType) {
+
+ import org.apache.paimon.spark.SparkV2FilterConverter._
+
+ val builder = new PredicateBuilder(rowType)
+
+ def convert(sparkPredicate: SparkPredicate): Predicate = {
+ sparkPredicate.name() match {
+ case EQUAL_TO =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ // TODO deal with isNaN
+ val index = fieldIndex(fieldName)
+ builder.equal(index, convertLiteral(index, literal))
+ }
+
+ case EQUAL_NULL_SAFE =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ if (literal == null) {
+ builder.isNull(index)
+ } else {
+ builder.equal(index, convertLiteral(index, literal))
+ }
+ }
+
+ case GREATER_THAN =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.greaterThan(index, convertLiteral(index, literal))
+ }
+
+ case GREATER_THAN_OR_EQUAL =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.greaterOrEqual(index, convertLiteral(index, literal))
+ }
+
+ case LESS_THAN =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.lessThan(index, convertLiteral(index, literal))
+ }
+
+ case LESS_THAN_OR_EQUAL =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.lessOrEqual(index, convertLiteral(index, literal))
+ }
+
+ case IN =>
+ MultiPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literals)) =>
+ val index = fieldIndex(fieldName)
+ literals.map(convertLiteral(index, _)).toList.asJava
+ builder.in(index, literals.map(convertLiteral(index,
_)).toList.asJava)
+ }
+
+ case IS_NULL =>
+ UnaryPredicate.unapply(sparkPredicate) match {
+ case Some(fieldName) =>
+ builder.isNull(fieldIndex(fieldName))
+ }
+
+ case IS_NOT_NULL =>
+ UnaryPredicate.unapply(sparkPredicate) match {
+ case Some(fieldName) =>
+ builder.isNotNull(fieldIndex(fieldName))
+ }
+
+ case AND =>
+ val and = sparkPredicate.asInstanceOf[And]
+ PredicateBuilder.and(convert(and.left), convert(and.right()))
+
+ case OR =>
+ val or = sparkPredicate.asInstanceOf[Or]
+ PredicateBuilder.or(convert(or.left), convert(or.right()))
+
+ case NOT =>
+ val not = sparkPredicate.asInstanceOf[Not]
+ val negate = convert(not.child()).negate()
+ if (negate.isPresent) {
+ negate.get()
+ } else {
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate is
unsupported.")
+ }
+
+ case STRING_START_WITH =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.startsWith(index, convertLiteral(index, literal))
+ }
+
+ case STRING_END_WITH =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.endsWith(index, convertLiteral(index, literal))
+ }
+
+ case STRING_CONTAINS =>
+ BinaryPredicate.unapply(sparkPredicate) match {
+ case Some((fieldName, literal)) =>
+ val index = fieldIndex(fieldName)
+ builder.contains(index, convertLiteral(index, literal))
+ }
+
+ // TODO: AlwaysTrue, AlwaysFalse
+ case _ => throw new UnsupportedOperationException(s"Convert
$sparkPredicate is unsupported.")
+ }
+ }
+
+ private object UnaryPredicate {
+ def unapply(sparkPredicate: SparkPredicate): Option[String] = {
+ sparkPredicate.children() match {
+ case Array(n: NamedReference) => Some(toFieldName(n))
+ case _ => None
+ }
+ }
+ }
+
+ private object BinaryPredicate {
+ def unapply(sparkPredicate: SparkPredicate): Option[(String, Any)] = {
+ sparkPredicate.children() match {
+ case Array(l: NamedReference, r: Literal[_]) => Some((toFieldName(l),
r.value))
+ case Array(l: Literal[_], r: NamedReference) => Some((toFieldName(r),
l.value))
+ case _ => None
+ }
+ }
+ }
+
+ private object MultiPredicate {
+ def unapply(sparkPredicate: SparkPredicate): Option[(String, Array[Any])]
= {
+ sparkPredicate.children() match {
+ case Array(first: NamedReference, rest @ _*)
+ if rest.nonEmpty && rest.forall(_.isInstanceOf[Literal[_]]) =>
+ Some(toFieldName(first),
rest.map(_.asInstanceOf[Literal[_]].value).toArray)
+ case _ => None
+ }
+ }
+ }
+
+ private def fieldIndex(fieldName: String): Int = {
+ val index = rowType.getFieldIndex(fieldName)
+ // TODO: support nested field
+ if (index == -1) {
+ throw new UnsupportedOperationException(s"Nested field '$fieldName' is
unsupported.")
+ }
+ index
+ }
+
+ private def convertLiteral(index: Int, value: Any): AnyRef = {
+ if (value == null) {
+ return null
+ }
+
+ val dataType = rowType.getTypeAt(index)
+ dataType.getTypeRoot match {
+ case BOOLEAN | BIGINT | DOUBLE | TINYINT | SMALLINT | INTEGER | FLOAT |
DATE =>
+ value.asInstanceOf[AnyRef]
+ case DataTypeRoot.VARCHAR =>
+ BinaryString.fromString(value.toString)
+ case DataTypeRoot.DECIMAL =>
+ val decimalType = dataType.asInstanceOf[DecimalType]
+ val precision = decimalType.getPrecision
+ val scale = decimalType.getScale
+ Decimal.fromBigDecimal(
+
value.asInstanceOf[org.apache.spark.sql.types.Decimal].toJavaBigDecimal,
+ precision,
+ scale)
+ case DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE |
DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
+ Timestamp.fromMicros(value.asInstanceOf[Long])
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Convert value: $value to datatype: $dataType is unsupported.")
+ }
+ }
+
+ private def toFieldName(ref: NamedReference): String =
ref.fieldNames().mkString(".")
+}
+
+object SparkV2FilterConverter {
+
+ private val EQUAL_TO = "="
+ private val EQUAL_NULL_SAFE = "<=>"
+ private val GREATER_THAN = ">"
+ private val GREATER_THAN_OR_EQUAL = ">="
+ private val LESS_THAN = "<"
+ private val LESS_THAN_OR_EQUAL = "<="
+ private val IN = "IN"
+ private val IS_NULL = "IS_NULL"
+ private val IS_NOT_NULL = "IS_NOT_NULL"
+ private val AND = "AND"
+ private val OR = "OR"
+ private val NOT = "NOT"
+ private val STRING_START_WITH = "STARTS_WITH"
+ private val STRING_END_WITH = "ENDS_WITH"
+ private val STRING_CONTAINS = "CONTAINS"
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index cc49e787dc..d01a840f8e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
+import org.apache.spark.sql.connector.expressions.FieldReference
+import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.PartitioningUtils
@@ -68,8 +70,16 @@ object PaimonUtils {
DataSourceStrategy.translateFilter(predicate,
supportNestedPredicatePushdown)
}
- def fieldReference(name: String): NamedReference = {
- FieldReference(Seq(name))
+ def translateFilterV2(predicate: Expression): Option[Predicate] = {
+ translateFilterV2WithMapping(predicate, None)
+ }
+
+ def fieldReference(name: String): FieldReference = {
+ fieldReference(Seq(name))
+ }
+
+ def fieldReference(parts: Seq[String]): FieldReference = {
+ FieldReference(parts)
}
def bytesToString(size: Long): String = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 9a1647da81..47f0c5a7d3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -81,6 +81,7 @@ class PaimonSparkTestBase
super.beforeAll()
spark.sql(s"USE paimon")
spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$dbName0")
+ spark.sql(s"USE paimon.$dbName0")
}
override protected def afterAll(): Unit = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
new file mode 100644
index 0000000000..b9cbc29b3a
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
+import org.apache.paimon.predicate.PredicateBuilder
+import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter}
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.PaimonUtils.translateFilterV2
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+
+import java.time.{LocalDate, LocalDateTime}
+
+import scala.collection.JavaConverters._
+
+/** Test for [[SparkV2FilterConverter]]. */
+abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase {
+
+ // Add it to disable the automatic generation of the not null filter which
impact test.
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.constraintPropagation.enabled", "false")
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ sql("""
+ |CREATE TABLE test_tbl (
+ | string_col STRING,
+ | byte_col BYTE,
+ | short_col SHORT,
+ | int_col INT,
+ | long_col LONG,
+ | float_col FLOAT,
+ | double_col DOUBLE,
+ | decimal_col DECIMAL(10, 5),
+ | boolean_col BOOLEAN,
+ | date_col DATE,
+ | binary BINARY
+ |) USING paimon
+ |""".stripMargin)
+ }
+
+ override protected def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS test_tbl")
+ super.afterAll()
+ }
+
+ lazy val rowType: RowType = loadTable("test_tbl").rowType()
+
+ lazy val builder = new PredicateBuilder(rowType)
+
+ lazy val converter: SparkV2FilterConverter = SparkV2FilterConverter(rowType)
+
+ test("V2Filter: all types") {
+ var actual = converter.convert(v2Filter("string_col = 'hello'"))
+ assert(actual.equals(builder.equal(0, BinaryString.fromString("hello"))))
+
+ actual = converter.convert(v2Filter("byte_col = 1"))
+ assert(actual.equals(builder.equal(1, 1.toByte)))
+
+ actual = converter.convert(v2Filter("short_col = 1"))
+ assert(actual.equals(builder.equal(2, 1.toShort)))
+
+ actual = converter.convert(v2Filter("int_col = 1"))
+ assert(actual.equals(builder.equal(3, 1)))
+
+ actual = converter.convert(v2Filter("long_col = 1"))
+ assert(actual.equals(builder.equal(4, 1L)))
+
+ actual = converter.convert(v2Filter("float_col = 1.0"))
+ assert(actual.equals(builder.equal(5, 1.0f)))
+
+ actual = converter.convert(v2Filter("double_col = 1.0"))
+ assert(actual.equals(builder.equal(6, 1.0d)))
+
+ actual = converter.convert(v2Filter("decimal_col = 12.12345"))
+ assert(
+ actual.equals(
+ builder.equal(7, Decimal.fromBigDecimal(new
java.math.BigDecimal("12.12345"), 10, 5))))
+
+ actual = converter.convert(v2Filter("boolean_col = true"))
+ assert(actual.equals(builder.equal(8, true)))
+
+ actual = converter.convert(v2Filter("date_col = cast('2025-01-15' as
date)"))
+ val localDate = LocalDate.parse("2025-01-15")
+ val epochDay = localDate.toEpochDay.toInt
+ assert(actual.equals(builder.equal(9, epochDay)))
+
+ intercept[UnsupportedOperationException] {
+ actual = converter.convert(v2Filter("binary = binary('b1')"))
+ }
+ }
+
+ test("V2Filter: timestamp and timestamp_ntz") {
+ withTimeZone("Asia/Shanghai") {
+ withTable("ts_tbl", "ts_ntz_tbl") {
+ sql("CREATE TABLE ts_tbl (ts_col TIMESTAMP) USING paimon")
+ val rowType1 = loadTable("ts_tbl").rowType()
+ val converter1 = SparkV2FilterConverter(rowType1)
+ val actual1 =
+ converter1.convert(v2Filter("ts_col = timestamp'2025-01-15
00:00:00.123'", "ts_tbl"))
+ assert(
+ actual1.equals(new PredicateBuilder(rowType1)
+ .equal(0,
Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-14T16:00:00.123")))))
+
+ // Spark support TIMESTAMP_NTZ since Spark 3.4
+ if (gteqSpark3_4) {
+ sql("CREATE TABLE ts_ntz_tbl (ts_ntz_col TIMESTAMP_NTZ) USING
paimon")
+ val rowType2 = loadTable("ts_ntz_tbl").rowType()
+ val converter2 = SparkV2FilterConverter(rowType2)
+ val actual2 = converter2.convert(
+ v2Filter("ts_ntz_col = timestamp_ntz'2025-01-15 00:00:00.123'",
"ts_ntz_tbl"))
+ assert(actual2.equals(new PredicateBuilder(rowType2)
+ .equal(0,
Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-15T00:00:00.123")))))
+ }
+ }
+ }
+ }
+
+ test("V2Filter: EqualTo") {
+ val actual = converter.convert(v2Filter("int_col = 1"))
+ assert(actual.equals(builder.equal(3, 1)))
+ }
+
+ test("V2Filter: EqualNullSafe") {
+ var actual = converter.convert(v2Filter("int_col <=> 1"))
+ assert(actual.equals(builder.equal(3, 1)))
+
+ actual = converter.convert(v2Filter("int_col <=> null"))
+ assert(actual.equals(builder.isNull(3)))
+ }
+
+ test("V2Filter: GreaterThan") {
+ val actual = converter.convert(v2Filter("int_col > 1"))
+ assert(actual.equals(builder.greaterThan(3, 1)))
+ }
+
+ test("V2Filter: GreaterThanOrEqual") {
+ val actual = converter.convert(v2Filter("int_col >= 1"))
+ assert(actual.equals(builder.greaterOrEqual(3, 1)))
+ }
+
+ test("V2Filter: LessThan") {
+ val actual = converter.convert(v2Filter("int_col < 1"))
+ assert(actual.equals(builder.lessThan(3, 1)))
+ }
+
+ test("V2Filter: LessThanOrEqual") {
+ val actual = converter.convert(v2Filter("int_col <= 1"))
+ assert(actual.equals(builder.lessOrEqual(3, 1)))
+ }
+
+ test("V2Filter: In") {
+ val actual = converter.convert(v2Filter("int_col IN (1, 2, 3)"))
+ assert(actual.equals(builder.in(3, List(1, 2,
3).map(_.asInstanceOf[AnyRef]).asJava)))
+ }
+
+ test("V2Filter: IsNull") {
+ val actual = converter.convert(v2Filter("int_col IS NULL"))
+ assert(actual.equals(builder.isNull(3)))
+ }
+
+ test("V2Filter: IsNotNull") {
+ val actual = converter.convert(v2Filter("int_col IS NOT NULL"))
+ assert(actual.equals(builder.isNotNull(3)))
+ }
+
+ test("V2Filter: And") {
+ val actual = converter.convert(v2Filter("int_col > 1 AND int_col < 10"))
+ assert(actual.equals(PredicateBuilder.and(builder.greaterThan(3, 1),
builder.lessThan(3, 10))))
+ }
+
+ test("V2Filter: Or") {
+ val actual = converter.convert(v2Filter("int_col > 1 OR int_col < 10"))
+ assert(actual.equals(PredicateBuilder.or(builder.greaterThan(3, 1),
builder.lessThan(3, 10))))
+ }
+
+ test("V2Filter: Not") {
+ val actual = converter.convert(v2Filter("NOT (int_col > 1)"))
+ assert(actual.equals(builder.greaterThan(3, 1).negate().get()))
+ }
+
+ test("V2Filter: StartWith") {
+ val actual = converter.convert(v2Filter("string_col LIKE 'h%'"))
+ assert(actual.equals(builder.startsWith(0, BinaryString.fromString("h"))))
+ }
+
+ test("V2Filter: EndWith") {
+ val actual = converter.convert(v2Filter("string_col LIKE '%o'"))
+ assert(actual.equals(builder.endsWith(0, BinaryString.fromString("o"))))
+ }
+
+ test("V2Filter: Contains") {
+ val actual = converter.convert(v2Filter("string_col LIKE '%e%'"))
+ assert(actual.equals(builder.contains(0, BinaryString.fromString("e"))))
+ }
+
+ private def v2Filter(str: String, tableName: String = "test_tbl"): Predicate
= {
+ val condition = sql(s"SELECT * FROM $tableName WHERE
$str").queryExecution.optimizedPlan
+ .collectFirst { case f: Filter => f }
+ .get
+ .condition
+ translateFilterV2(condition).get
+ }
+}