LadyForest commented on code in PR #23478: URL: https://github.com/apache/flink/pull/23478#discussion_r1369833456
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala: ########## @@ -403,47 +445,71 @@ object ScalarOperatorGens { | equals(${right.resultTerm}.getBinarySection()); |} |""".stripMargin - GeneratedExpression(resultTerm, nullTerm, code, resultType) + wrapExpressionIfNonEq( Review Comment: Similarly, here we cannot call `wrapExpressionIfNonEq` directly. I've composed a test case to reproduce the issue. ```scala @Test def debugging(): Unit = { //You can set the class access modifier to public temporarily for test val rawType = new RawType[LogicalTypesTest.Human]( classOf[LogicalTypesTest.Human], new KryoSerializer[LogicalTypesTest.Human]( classOf[LogicalTypesTest.Human], new ExecutionConfig)) val className = "org.apache.flink.table.types.LogicalTypesTest$Human" val serializerString = rawType.asSerializableString tEnv.executeSql( s""" |create table test_in1 ( | content raw('$className', '$serializerString') |) with ('connector' = 'filesystem', 'format' = 'raw', 'path' = '/tmp/test-dir-in-1') |""".stripMargin) tEnv.executeSql( s""" |create table test_in2 ( | content raw('$className', '$serializerString') |) with ('connector' = 'filesystem', 'format' = 'raw', 'path' = '/tmp/test-dir-in-2') |""".stripMargin) tEnv.executeSql( s""" |create table test_out ( | content raw('$className', '$serializerString') |) with ('connector' = 'filesystem', 'format' = 'raw', 'path' = '/tmp/test-dir-out') |""".stripMargin) tEnv .executeSql( """ |insert into test_out |select c2 from ( | select in1.content as c1, in2.content as c2 | from test_in1 in1 join test_in2 in2 | on in1.content <> in2.content |) tmp |""".stripMargin) .await() } ``` The correct and wrong generated non-equi condition are as follows: ```java // correct boolean isNull$5 = isNull$0 || isNull$3; boolean result$5 = false; if (!isNull$5) { field$2.ensureMaterialized(typeSerializer$1.getInnerSerializer()); field$4.ensureMaterialized(typeSerializer$1.getInnerSerializer()); result$5 = !field$2.getBinarySection(). equals(field$4.getBinarySection()); } return result$5; // wrong boolean isNull$5 = isNull$0 || isNull$3; boolean result$5 = false; if (!isNull$5) { field$2.ensureMaterialized(typeSerializer$1.getInnerSerializer()); field$4.ensureMaterialized(typeSerializer$1.getInnerSerializer()); result$5 = field$2.getBinarySection(). equals(field$4.getBinarySection()); } return (!result$5); ``` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala: ########## @@ -345,44 +345,86 @@ object ScalarOperatorGens { } } - def generateEquals( + private def wrapExpressionIfNonEq( + isNonEq: Boolean, + equalsExpr: GeneratedExpression, + resultType: LogicalType): GeneratedExpression = { + if (isNonEq) { + GeneratedExpression( + s"(!${equalsExpr.resultTerm})", + equalsExpr.nullTerm, + equalsExpr.code, + resultType) + } else { + equalsExpr + } + } + + private def generateEqualAndNonEqual( ctx: CodeGeneratorContext, left: GeneratedExpression, right: GeneratedExpression, + operator: String, resultType: LogicalType): GeneratedExpression = { + checkImplicitConversionValidity(left, right) + + val nonEq = operator match { + case "==" => false + case "!=" => true + case _ => throw new CodeGenException(s"Unsupported boolean comparison '$operator'.") + } val canEqual = isInteroperable(left.resultType, right.resultType) + if (isCharacterString(left.resultType) && isCharacterString(right.resultType)) { - generateOperatorIfNotNull(ctx, resultType, left, right)( - (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)") + wrapExpressionIfNonEq( Review Comment: Add the following test to MiscITCase can reproduce the issue ```scala @Test def testCompareNullInFilter(): Unit = { checkQuery( Seq((null, 2), ("b", 1)), "SELECT f1 FROM Table1 WHERE f0 <> 'a'", Seq(Tuple1(1)) ) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org