This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d93973901 [spark] SparkFilterConverter supports convert EqualNullSafe 
(#2878)
d93973901 is described below

commit d9397390159bf1c7dbaf1dbdd2946ce9d622543f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Feb 21 09:10:49 2024 +0800

    [spark] SparkFilterConverter supports convert EqualNullSafe (#2878)
---
 .../org/apache/paimon/spark/SparkFilterConverter.java   | 17 +++++++++++++----
 .../apache/paimon/spark/SparkFilterConverterTest.java   | 11 +++++++++++
 2 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
index 4f7cee52c..f944ae1a2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 
 import org.apache.spark.sql.sources.And;
+import org.apache.spark.sql.sources.EqualNullSafe;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
@@ -49,6 +50,7 @@ public class SparkFilterConverter {
     public static final List<String> SUPPORT_FILTERS =
             Arrays.asList(
                     "EqualTo",
+                    "EqualNullSafe",
                     "GreaterThan",
                     "GreaterThanOrEqual",
                     "LessThan",
@@ -76,6 +78,15 @@ public class SparkFilterConverter {
             int index = fieldIndex(eq.attribute());
             Object literal = convertLiteral(index, eq.value());
             return builder.equal(index, literal);
+        } else if (filter instanceof EqualNullSafe) {
+            EqualNullSafe eq = (EqualNullSafe) filter;
+            if (eq.value() == null) {
+                return builder.isNull(fieldIndex(eq.attribute()));
+            } else {
+                int index = fieldIndex(eq.attribute());
+                Object literal = convertLiteral(index, eq.value());
+                return builder.equal(index, literal);
+            }
         } else if (filter instanceof GreaterThan) {
             GreaterThan gt = (GreaterThan) filter;
             int index = fieldIndex(gt.attribute());
@@ -124,15 +135,13 @@ public class SparkFilterConverter {
             return builder.startsWith(index, literal);
         }
 
-        // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
+        // TODO: AlwaysTrue, AlwaysFalse
         throw new UnsupportedOperationException(
                 filter + " is unsupported. Support Filters: " + 
SUPPORT_FILTERS);
     }
 
     public Object convertLiteral(String field, Object value) {
-        int index = fieldIndex(field);
-        DataType type = rowType.getTypeAt(index);
-        return convertJavaObject(type, value);
+        return convertLiteral(fieldIndex(field), value);
     }
 
     private int fieldIndex(String field) {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
index 4f7e46433..9f669d493 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
 
+import org.apache.spark.sql.sources.EqualNullSafe;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.GreaterThan;
 import org.apache.spark.sql.sources.GreaterThanOrEqual;
@@ -118,6 +119,16 @@ public class SparkFilterConverterTest {
         Predicate actualEqNull = converter.convert(eqNull);
         assertThat(actualEqNull).isEqualTo(expectedEqNull);
 
+        EqualNullSafe eqSafe = EqualNullSafe.apply(field, 1);
+        Predicate expectedEqSafe = builder.equal(0, 1);
+        Predicate actualEqSafe = converter.convert(eqSafe);
+        assertThat(actualEqSafe).isEqualTo(expectedEqSafe);
+
+        EqualNullSafe eqNullSafe = EqualNullSafe.apply(field, null);
+        Predicate expectEqNullSafe = builder.isNull(0);
+        Predicate actualEqNullSafe = converter.convert(eqNullSafe);
+        assertThat(actualEqNullSafe).isEqualTo(expectEqNullSafe);
+
         In in = In.apply(field, new Object[] {1, null, 2});
         Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2));
         Predicate actualIn = converter.convert(in);

Reply via email to