This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 98ae790 [FLINK-19906][table-planner-blink] Fix incorrect result when
compare two binary fields
98ae790 is described below
commit 98ae79056929ef0a17968b26619dbd3d92274af1
Author: wangxlong <[email protected]>
AuthorDate: Tue Nov 17 10:05:52 2020 +0800
[FLINK-19906][table-planner-blink] Fix incorrect result when compare two
binary fields
This closes #13865
---
.../planner/codegen/calls/ScalarOperatorGens.scala | 4 ++-
.../planner/expressions/ScalarOperatorsTest.scala | 34 ++++++++++++++++++++++
.../planner/expressions/SqlExpressionTest.scala | 4 +++
.../utils/ScalarOperatorsTestBase.scala | 6 ++--
.../table/runtime/functions/SqlFunctionUtils.java | 26 +++++++++++++++++
5 files changed, 71 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index b0a7f44..567b0d7 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.planner.codegen.GenerateUtils._
import
org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL,
NEVER_NULL, NO_CODE}
import org.apache.flink.table.planner.codegen.{CodeGenException,
CodeGeneratorContext, GeneratedExpression}
import org.apache.flink.table.planner.typeutils.TypeCoercion
+import org.apache.flink.table.runtime.functions.SqlFunctionUtils
import
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
import org.apache.flink.table.runtime.types.PlannerTypeUtils
import org.apache.flink.table.runtime.types.PlannerTypeUtils.{isInteroperable,
isPrimitive}
@@ -562,8 +563,9 @@ object ScalarOperatorGens {
// both sides are binary type
else if (isBinaryString(left.resultType) &&
isInteroperable(left.resultType, right.resultType)) {
+ val utilName = classOf[SqlFunctionUtils].getCanonicalName
(leftTerm, rightTerm) =>
- s"java.util.Arrays.equals($leftTerm, $rightTerm)"
+ s"$utilName.byteArrayCompare($leftTerm, $rightTerm) $operator 0"
}
// both sides are same comparable type
else if (isComparable(left.resultType) &&
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index de0899f..0bfff12 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -63,6 +63,40 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
}
@Test
+ def testCompareOperator(): Unit= {
+
+ // f18 and f19 have same length.
+ testSqlApi(
+ "f18 > f19",
+ "true")
+ testSqlApi(
+ "f18 >= f19",
+ "true")
+ testSqlApi(
+ "f18 < f19",
+ "false")
+ testSqlApi(
+ "f18 <= f19",
+ "false")
+ testSqlApi(
+ "f18 = f18",
+ "true")
+
+ // f20's length is short than f19's, but great than it.
+ testSqlApi(
+ "f19 < f20",
+ "true")
+
+ testSqlApi(
+ "x'68656C6C6F20636F6465' < x'68656C6C6F2063617374'",
+ "false")
+ testSqlApi(
+ "x'68656C6C6F20636F6465' > x'68656C6C6F2063617374'",
+ "true")
+
+ }
+
+ @Test
def testCast(): Unit = {
// binary -> varchar
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 53e5f7c..7613c18 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -44,6 +44,10 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("2 >= 2", "true")
testSqlApi("5 < 2", "false")
testSqlApi("2 <= 2", "true")
+ testSqlApi("x'0c' <= x'0b'", "false")
+ testSqlApi("x'0c' > x'0b'", "true")
+ testSqlApi("x'0c' = x'0c'", "true")
+ testSqlApi("x'0c' <> x'0c'", "false")
testSqlApi("1 IS NULL", "false")
testSqlApi("1 IS NOT NULL", "true")
testSqlApi("NULLIF(1,1) IS DISTINCT FROM NULLIF(1,1)", "false")
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
index 8f9dc7a..a3968bc 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
abstract class ScalarOperatorsTestBase extends ExpressionTestBase {
override def testData: Row = {
- val testData = new Row(20)
+ val testData = new Row(21)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
testData.setField(2, 1)
@@ -51,6 +51,7 @@ abstract class ScalarOperatorsTestBase extends
ExpressionTestBase {
DecimalDataUtils.castFrom("10.0", 19, 1).toBigDecimal)
testData.setField(18, "hello world".getBytes())
testData.setField(19, "hello flink".getBytes())
+ testData.setField(20, "who".getBytes())
testData
}
@@ -78,7 +79,8 @@ abstract class ScalarOperatorsTestBase extends
ExpressionTestBase {
DataTypes.FIELD("f16", DataTypes.DECIMAL(19, 8)),
DataTypes.FIELD("f17", DataTypes.DECIMAL(19, 1)),
DataTypes.FIELD("f18", DataTypes.BINARY(200)),
- DataTypes.FIELD("f19", DataTypes.VARBINARY(200))
+ DataTypes.FIELD("f19", DataTypes.VARBINARY(200)),
+ DataTypes.FIELD("f20", DataTypes.VARBINARY(200))
)
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index a02454e..c6ddce5 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1203,4 +1203,30 @@ public class SqlFunctionUtils {
}
return x;
}
+
+ /**
+ * Compares two byte arrays in lexicographical order.
+ *
+ * <p>The result is
+ * positive if {@code array1} is great than {@code array2},
+ * negative if {@code array1} is less than {@code array2} and
+ * 0 if {@code array1} is equal to {@code array2}.
+ *
+ * <p>Note: Currently, this is used in {@code ScalarOperatorGens}
+ * for comparing two fields of binary or varbinary type.
+ *
+ * @param array1 byte array to compare.
+ * @param array2 byte array to compare.
+ * @return an Integer indicating which one is bigger
+ */
+ public static int byteArrayCompare(byte[] array1, byte[] array2) {
+ for (int i = 0, j = 0; i < array1.length && j < array2.length;
i++, j++) {
+ int a = (array1[i] & 0xff);
+ int b = (array2[j] & 0xff);
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return array1.length - array2.length;
+ }
}