This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 98ae790  [FLINK-19906][table-planner-blink] Fix incorrect result when 
compare two binary fields
98ae790 is described below

commit 98ae79056929ef0a17968b26619dbd3d92274af1
Author: wangxlong <[email protected]>
AuthorDate: Tue Nov 17 10:05:52 2020 +0800

    [FLINK-19906][table-planner-blink] Fix incorrect result when compare two 
binary fields
    
    This closes #13865
---
 .../planner/codegen/calls/ScalarOperatorGens.scala |  4 ++-
 .../planner/expressions/ScalarOperatorsTest.scala  | 34 ++++++++++++++++++++++
 .../planner/expressions/SqlExpressionTest.scala    |  4 +++
 .../utils/ScalarOperatorsTestBase.scala            |  6 ++--
 .../table/runtime/functions/SqlFunctionUtils.java  | 26 +++++++++++++++++
 5 files changed, 71 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index b0a7f44..567b0d7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.codegen.GenerateUtils._
 import 
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, 
NEVER_NULL, NO_CODE}
 import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.typeutils.TypeCoercion
+import org.apache.flink.table.runtime.functions.SqlFunctionUtils
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.{isInteroperable, 
isPrimitive}
@@ -562,8 +563,9 @@ object ScalarOperatorGens {
       // both sides are binary type
       else if (isBinaryString(left.resultType) &&
           isInteroperable(left.resultType, right.resultType)) {
+        val utilName = classOf[SqlFunctionUtils].getCanonicalName
         (leftTerm, rightTerm) =>
-          s"java.util.Arrays.equals($leftTerm, $rightTerm)"
+          s"$utilName.byteArrayCompare($leftTerm, $rightTerm) $operator 0"
       }
       // both sides are same comparable type
       else if (isComparable(left.resultType) &&
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index de0899f..0bfff12 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -63,6 +63,40 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
   }
 
   @Test
+  def testCompareOperator(): Unit= {
+
+    // f18 and f19 have same length.
+    testSqlApi(
+      "f18 > f19",
+      "true")
+    testSqlApi(
+      "f18 >= f19",
+      "true")
+    testSqlApi(
+      "f18 < f19",
+      "false")
+    testSqlApi(
+      "f18 <= f19",
+      "false")
+    testSqlApi(
+      "f18 = f18",
+      "true")
+
+    // f20's length is short than f19's, but great than it.
+    testSqlApi(
+      "f19 < f20",
+      "true")
+
+    testSqlApi(
+      "x'68656C6C6F20636F6465' < x'68656C6C6F2063617374'",
+      "false")
+    testSqlApi(
+      "x'68656C6C6F20636F6465' > x'68656C6C6F2063617374'",
+      "true")
+
+  }
+
+  @Test
   def testCast(): Unit = {
 
     // binary -> varchar
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 53e5f7c..7613c18 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -44,6 +44,10 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("2 >= 2", "true")
     testSqlApi("5 < 2", "false")
     testSqlApi("2 <= 2", "true")
+    testSqlApi("x'0c' <= x'0b'", "false")
+    testSqlApi("x'0c' > x'0b'", "true")
+    testSqlApi("x'0c' = x'0c'", "true")
+    testSqlApi("x'0c' <> x'0c'", "false")
     testSqlApi("1 IS NULL", "false")
     testSqlApi("1 IS NOT NULL", "true")
     testSqlApi("NULLIF(1,1) IS DISTINCT FROM NULLIF(1,1)", "false")
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
index 8f9dc7a..a3968bc 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
 abstract class ScalarOperatorsTestBase extends ExpressionTestBase {
 
   override def testData: Row = {
-    val testData = new Row(20)
+    val testData = new Row(21)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -51,6 +51,7 @@ abstract class ScalarOperatorsTestBase extends 
ExpressionTestBase {
         DecimalDataUtils.castFrom("10.0", 19, 1).toBigDecimal)
     testData.setField(18, "hello world".getBytes())
     testData.setField(19, "hello flink".getBytes())
+    testData.setField(20, "who".getBytes())
     testData
   }
 
@@ -78,7 +79,8 @@ abstract class ScalarOperatorsTestBase extends 
ExpressionTestBase {
         DataTypes.FIELD("f16", DataTypes.DECIMAL(19, 8)),
         DataTypes.FIELD("f17", DataTypes.DECIMAL(19, 1)),
         DataTypes.FIELD("f18", DataTypes.BINARY(200)),
-        DataTypes.FIELD("f19", DataTypes.VARBINARY(200))
+        DataTypes.FIELD("f19", DataTypes.VARBINARY(200)),
+        DataTypes.FIELD("f20", DataTypes.VARBINARY(200))
     )
   }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index a02454e..c6ddce5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1203,4 +1203,30 @@ public class SqlFunctionUtils {
                }
                return x;
        }
+
+       /**
+        * Compares two byte arrays in lexicographical order.
+        *
+        * <p>The result is
+        * positive if {@code array1} is great than {@code array2},
+        * negative if {@code array1} is less than {@code array2} and
+        * 0 if {@code array1} is equal to {@code array2}.
+        *
+        * <p>Note: Currently, this is used in {@code ScalarOperatorGens}
+        * for comparing two fields of binary or varbinary type.
+        *
+        * @param array1 byte array to compare.
+        * @param array2 byte array to compare.
+        * @return an Integer indicating which one is bigger
+        */
+       public static int byteArrayCompare(byte[] array1, byte[] array2) {
+               for (int i = 0, j = 0; i < array1.length && j < array2.length; 
i++, j++) {
+                       int a = (array1[i] & 0xff);
+                       int b = (array2[j] & 0xff);
+                       if (a != b) {
+                               return a - b;
+                       }
+               }
+               return array1.length - array2.length;
+       }
 }

Reply via email to