This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bfa946c82 [spark] Fix EqualNullSafe is not correct when column has 
null value. (#6943)
7bfa946c82 is described below

commit 7bfa946c82695f5c669adb8a4e354dcbcb645782
Author: venus <[email protected]>
AuthorDate: Sun Jan 4 10:59:08 2026 +0800

    [spark] Fix EqualNullSafe is not correct when column has null value. (#6943)
---
 .../apache/paimon/spark/SparkV2FilterConverter.scala   |  2 +-
 .../org/apache/paimon/spark/PaimonSourceTest.scala     | 18 ++++++++++++++++++
 .../spark/sql/SparkV2FilterConverterTestBase.scala     |  2 +-
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index 84937e2fdd..03638ca037 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -62,7 +62,7 @@ case class SparkV2FilterConverter(rowType: RowType) extends 
Logging {
             if (literal == null) {
               builder.isNull(transform)
             } else {
-              builder.equal(transform, literal)
+              PredicateBuilder.and(builder.isNotNull(transform), 
builder.equal(transform, literal))
             }
           case _ =>
             throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index 58cf9868dc..e8b685664c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -30,6 +30,24 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
 
   import testImplicits._
 
+  test("Paimon Source: EQUAL_NULL_SAFE") {
+    withTempDir {
+      _ =>
+        {
+          val TableSnapshotState(_, _, _, _, _) = 
prepareTableAndGetLocation(0, hasPk = true)
+          spark.sql("INSERT INTO T VALUES (1, CAST(null as string)), (2, 
CAST(null as string))")
+          val currentResult = () => spark.sql("SELECT * FROM T WHERE !(b <=> 
'v_1')")
+          checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null)))
+          spark.sql("INSERT INTO T VALUES (3, 'v_1'), (4, CAST(null as 
string))")
+          checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null), Row(4, 
null)))
+          val valueDF = spark.sql("SELECT * FROM T WHERE b <=> 'v_1'")
+          checkAnswer(valueDF, Seq(Row(3, "v_1")))
+          val nullDF = spark.sql("SELECT * FROM T WHERE b <=> null")
+          checkAnswer(nullDF, Seq(Row(1, null), Row(2, null), Row(4, null)))
+        }
+    }
+  }
+
   test("Paimon Source: default scan mode") {
     withTempDir {
       checkpointDir =>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
index 05704e3b36..2d90777fd7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -220,7 +220,7 @@ abstract class SparkV2FilterConverterTestBase extends 
PaimonSparkTestBase {
   test("V2Filter: EqualNullSafe") {
     var filter = "int_col <=> 1"
     var actual = converter.convert(v2Filter(filter)).get
-    assert(actual.equals(builder.equal(3, 1)))
+    assert(actual.equals(PredicateBuilder.and(builder.isNotNull(3), 
builder.equal(3, 1))))
     checkAnswer(sql(s"SELECT int_col from test_tbl WHERE $filter ORDER BY 
int_col"), Seq(Row(1)))
     assert(scanFilesCount(filter) == 1)
 

Reply via email to