This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9f9e8feb [flink] SqlPredicate add not in (#5957)
3c9f9e8feb is described below

commit 3c9f9e8feba4b995307845735adb63f3499c24eb
Author: WenDing-Y <1062698...@qq.com>
AuthorDate: Fri Jul 25 18:50:43 2025 +0800

    [flink] SqlPredicate add not in (#5957)
---
 .../paimon/flink/predicate/SimpleSqlPredicateConvertor.java | 13 +++++++++++++
 .../org/apache/paimon/flink/utils/FlinkCalciteClasses.java  |  4 ++++
 .../flink/predicate/SimpleSqlPredicateConvertorTest.java    | 13 +++++++++++++
 3 files changed, 30 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
index dcdcc31a9f..d54c76f6f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
@@ -93,6 +93,19 @@ public class SimpleSqlPredicateConvertor {
                     list.add(literal);
                 }
                 return builder.in(index, list);
+            } else if (kind == calciteClasses.sqlKindDelegate().notIn()) {
+                int index = getFieldIndex(left.toString());
+                List<?> elementslist = 
calciteClasses.sqlNodeListDelegate().getList(right);
+
+                List<Object> list = new ArrayList<>();
+                for (Object sqlNode : elementslist) {
+                    Object literal =
+                            TypeUtils.castFromString(
+                                    
calciteClasses.sqlLiteralDelegate().toValue(sqlNode),
+                                    rowType.getFieldTypes().get(index));
+                    list.add(literal);
+                }
+                return builder.in(index, list).negate().get();
             }
         } else if 
(calciteClasses.sqlOperatorDelegate().instanceOfSqlPostfixOperator(operator)) {
             Object child =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
index 396a3c4c07..5410dc3533 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
@@ -207,6 +207,10 @@ public class FlinkCalciteClasses {
         public Object not() throws NoSuchFieldException, 
IllegalAccessException {
             return clazz.getField("NOT").get(null);
         }
+
+        public Object notIn() throws NoSuchFieldException, 
IllegalAccessException {
+            return clazz.getField("NOT_IN").get(null);
+        }
     }
 
     /** Accessing org.apache.calcite.sql.parser.SqlParser$Config by 
Reflection. */
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
index 55695da03c..a68d15e746 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
@@ -162,6 +162,19 @@ class SimpleSqlPredicateConvertorTest {
                 .isEqualTo(predicateBuilder.in(predicateBuilder.indexOf("a"), 
elements));
     }
 
+    @Test
+    public void testNotIn() throws Exception {
+        Predicate predicate =
+                simpleSqlPredicateConvertor.convertSqlToPredicate("a not in 
('1','2')");
+        List<Object> elements = Lists.newArrayList(1, 2);
+        Assertions.assertThat(predicate)
+                .isEqualTo(
+                        predicateBuilder
+                                .in(predicateBuilder.indexOf("a"), elements)
+                                .negate()
+                                .get());
+    }
+
     @Test
     public void testIsNull() throws Exception {
         Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a is null ");

Reply via email to