LadyForest commented on code in PR #23478:
URL: https://github.com/apache/flink/pull/23478#discussion_r1369833456


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala:
##########
@@ -403,47 +445,71 @@ object ScalarOperatorGens {
            |    equals(${right.resultTerm}.getBinarySection());
            |}
            |""".stripMargin
-      GeneratedExpression(resultTerm, nullTerm, code, resultType)
+      wrapExpressionIfNonEq(

Review Comment:
   Similarly, here we cannot call `wrapExpressionIfNonEq` directly.
   
   I've composed a test case to reproduce the issue.
   ```scala
     @Test
     def debugging(): Unit = {
       //You can set the class access modifier to public temporarily for test 
       val rawType = new RawType[LogicalTypesTest.Human](
         classOf[LogicalTypesTest.Human],
         new KryoSerializer[LogicalTypesTest.Human](
           classOf[LogicalTypesTest.Human],
           new ExecutionConfig))
       val className = "org.apache.flink.table.types.LogicalTypesTest$Human"
       val serializerString = rawType.asSerializableString
       tEnv.executeSql(
         s"""
            |create table test_in1 (
            |  content raw('$className', '$serializerString')
            |) with ('connector' = 'filesystem', 'format' = 'raw', 'path' = 
'/tmp/test-dir-in-1')
            |""".stripMargin)
       tEnv.executeSql(
         s"""
            |create table test_in2 (
            |  content raw('$className', '$serializerString')
            |) with ('connector' = 'filesystem', 'format' = 'raw', 'path' = 
'/tmp/test-dir-in-2')
            |""".stripMargin)
       tEnv.executeSql(
         s"""
            |create table test_out (
            |  content raw('$className', '$serializerString')
            |) with ('connector' = 'filesystem', 'format' = 'raw', 'path' = 
'/tmp/test-dir-out')
            |""".stripMargin)
       tEnv
         .executeSql(
           """
             |insert into test_out
             |select c2 from (
             |  select in1.content as c1, in2.content as c2
             |  from test_in1 in1 join test_in2 in2
             |  on in1.content <> in2.content
             |) tmp
             |""".stripMargin)
         .await()
     }
   ```
   The correct and wrong generated non-equi condition are as follows:
   ```java
   // correct
   boolean isNull$5 = isNull$0 || isNull$3;
             boolean result$5 = false;
             if (!isNull$5) {
               
field$2.ensureMaterialized(typeSerializer$1.getInnerSerializer());
               
field$4.ensureMaterialized(typeSerializer$1.getInnerSerializer());
               result$5 =
                 !field$2.getBinarySection().
                 equals(field$4.getBinarySection());
             }
             
             return result$5;
   // wrong
   boolean isNull$5 = isNull$0 || isNull$3;
             boolean result$5 = false;
             if (!isNull$5) {
               
field$2.ensureMaterialized(typeSerializer$1.getInnerSerializer());
               
field$4.ensureMaterialized(typeSerializer$1.getInnerSerializer());
               result$5 =
                 field$2.getBinarySection().
                 equals(field$4.getBinarySection());
             }
             
             return (!result$5);
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala:
##########
@@ -345,44 +345,86 @@ object ScalarOperatorGens {
     }
   }
 
-  def generateEquals(
+  private def wrapExpressionIfNonEq(
+      isNonEq: Boolean,
+      equalsExpr: GeneratedExpression,
+      resultType: LogicalType): GeneratedExpression = {
+    if (isNonEq) {
+      GeneratedExpression(
+        s"(!${equalsExpr.resultTerm})",
+        equalsExpr.nullTerm,
+        equalsExpr.code,
+        resultType)
+    } else {
+      equalsExpr
+    }
+  }
+
+  private def generateEqualAndNonEqual(
       ctx: CodeGeneratorContext,
       left: GeneratedExpression,
       right: GeneratedExpression,
+      operator: String,
       resultType: LogicalType): GeneratedExpression = {
+
     checkImplicitConversionValidity(left, right)
+
+    val nonEq = operator match {
+      case "==" => false
+      case "!=" => true
+      case _ => throw new CodeGenException(s"Unsupported boolean comparison 
'$operator'.")
+    }
     val canEqual = isInteroperable(left.resultType, right.resultType)
+
     if (isCharacterString(left.resultType) && 
isCharacterString(right.resultType)) {
-      generateOperatorIfNotNull(ctx, resultType, left, right)(
-        (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)")
+      wrapExpressionIfNonEq(

Review Comment:
   Add the following test to MiscITCase can reproduce the issue
   ```scala
     @Test
     def testCompareNullInFilter(): Unit = {
       checkQuery(
         Seq((null, 2), ("b", 1)),
         "SELECT f1 FROM Table1 WHERE f0 <> 'a'",
         Seq(Tuple1(1))
       )
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to