This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7bfa946c82 [spark] Fix EqualNullSafe is not correct when column has
null value. (#6943)
7bfa946c82 is described below
commit 7bfa946c82695f5c669adb8a4e354dcbcb645782
Author: venus <[email protected]>
AuthorDate: Sun Jan 4 10:59:08 2026 +0800
[spark] Fix EqualNullSafe is not correct when column has null value. (#6943)
---
.../apache/paimon/spark/SparkV2FilterConverter.scala | 2 +-
.../org/apache/paimon/spark/PaimonSourceTest.scala | 18 ++++++++++++++++++
.../spark/sql/SparkV2FilterConverterTestBase.scala | 2 +-
3 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index 84937e2fdd..03638ca037 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -62,7 +62,7 @@ case class SparkV2FilterConverter(rowType: RowType) extends
Logging {
if (literal == null) {
builder.isNull(transform)
} else {
- builder.equal(transform, literal)
+ PredicateBuilder.and(builder.isNotNull(transform),
builder.equal(transform, literal))
}
case _ =>
throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index 58cf9868dc..e8b685664c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -30,6 +30,24 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
import testImplicits._
+ test("Paimon Source: EQUAL_NULL_SAFE") {
+ withTempDir {
+ _ =>
+ {
+ val TableSnapshotState(_, _, _, _, _) =
prepareTableAndGetLocation(0, hasPk = true)
+ spark.sql("INSERT INTO T VALUES (1, CAST(null as string)), (2,
CAST(null as string))")
+ val currentResult = () => spark.sql("SELECT * FROM T WHERE !(b <=>
'v_1')")
+ checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null)))
+ spark.sql("INSERT INTO T VALUES (3, 'v_1'), (4, CAST(null as
string))")
+ checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null), Row(4,
null)))
+ val valueDF = spark.sql("SELECT * FROM T WHERE b <=> 'v_1'")
+ checkAnswer(valueDF, Seq(Row(3, "v_1")))
+ val nullDF = spark.sql("SELECT * FROM T WHERE b <=> null")
+ checkAnswer(nullDF, Seq(Row(1, null), Row(2, null), Row(4, null)))
+ }
+ }
+ }
+
test("Paimon Source: default scan mode") {
withTempDir {
checkpointDir =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
index 05704e3b36..2d90777fd7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -220,7 +220,7 @@ abstract class SparkV2FilterConverterTestBase extends
PaimonSparkTestBase {
test("V2Filter: EqualNullSafe") {
var filter = "int_col <=> 1"
var actual = converter.convert(v2Filter(filter)).get
- assert(actual.equals(builder.equal(3, 1)))
+ assert(actual.equals(PredicateBuilder.and(builder.isNotNull(3),
builder.equal(3, 1))))
checkAnswer(sql(s"SELECT int_col from test_tbl WHERE $filter ORDER BY
int_col"), Seq(Row(1)))
assert(scanFilesCount(filter) == 1)