This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4d37b8c34ff [FLINK-33313][table] Fix
RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle
binary literal
4d37b8c34ff is described below
commit 4d37b8c34ff062b7505ab8c0ca8f2181768aab60
Author: zoudan <[email protected]>
AuthorDate: Thu Oct 19 17:48:05 2023 +0800
[FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions
throws an Exception when handle binary literal
Close apache/flink#23551
---
.../table/planner/plan/utils/RexNodeExtractor.scala | 4 ++++
.../plan/utils/NestedProjectionUtilTest.scala | 10 ++++++----
.../planner/plan/utils/RexNodeExtractorTest.scala | 21 +++++++++++++++++++++
.../planner/plan/utils/RexNodeRewriterTest.scala | 7 ++++---
.../table/planner/plan/utils/RexNodeTestBase.scala | 6 ++++--
5 files changed, 39 insertions(+), 9 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index 482ce56dc63..481cbda8b82 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -37,6 +37,7 @@ import
org.apache.flink.table.types.logical.YearMonthIntervalType
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.util.Preconditions
+import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
@@ -502,6 +503,9 @@ class RexNodeToExpressionConverter(
// convert to BigDecimal
literal.getValueAs(classOf[java.math.BigDecimal])
+ case BINARY | VARBINARY =>
+ literal.getValueAs(classOf[Array[Byte]])
+
case _ =>
literal.getValue
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
index ec8214f5b91..9cd44c9fea7 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
@@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
"$2",
"$3",
"$4",
+ "$5",
"*($t2, $t3)",
"100",
- "<($t5, $t6)",
+ "<($t6, $t7)",
"6",
- ">($t1, $t8)",
- "AND($t7, $t9)")))
+ ">($t1, $t9)",
+ "AND($t8, $t10)")))
val nestedField = NestedProjectionUtil.build(exprs,
rexProgram.getInputRowType)
val paths = NestedProjectionUtil.convertToIndexArray(nestedField)
@@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
Array(1),
Array(2),
Array(3),
- Array(4)
+ Array(4),
+ Array(5)
)
assertArray(paths, orderedPaths)
val builder = new FlinkRexBuilder(typeFactory)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 2eb87e35cc8..bd5f15d3bed 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -33,6 +33,7 @@ import
org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction
import org.apache.flink.table.resource.ResourceManager
import org.apache.flink.table.utils.CatalogManagerMocks
+import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
@@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase {
assertEquals(0, unconvertedRexNodes.length)
}
+ @Test
+ def testExtractConditionWithBinaryLiteral(): Unit = {
+ // blob
+ val t0 = rexBuilder.makeInputRef(allFieldTypes.get(5), 5)
+
+ // X'616263'
+ val t1 = rexBuilder.makeBinaryLiteral(ByteString.of("616263", 16))
+
+ // blob = X'616263'
+ val a = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t0, t1)
+
+ val relBuilder: RexBuilder = new FlinkRexBuilder(typeFactory)
+ val (convertedExpressions, unconvertedRexNodes) =
+ extractConjunctiveConditions(a, -1, allFieldNames, relBuilder,
functionCatalog)
+
+ val expected: Array[Expression] = Array($"blob" === Array[Byte](97, 98,
99))
+ assertExpressionArrayEquals(expected, convertedExpressions)
+ assertEquals(0, unconvertedRexNodes.length)
+ }
+
// ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d)
@Test
def testExtractCnfCondition(): Unit = {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
index 0cea5c8be69..57a0edda39e 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriterTest.scala
@@ -39,12 +39,13 @@ class RexNodeRewriterTest extends RexNodeTestBase {
"$2",
"$3",
"$4",
+ "$5",
"*($t2, $t3)",
"100",
- "<($t5, $t6)",
+ "<($t6, $t7)",
"6",
- ">($t1, $t8)",
- "AND($t7, $t9)")))
+ ">($t1, $t9)",
+ "AND($t8, $t10)")))
// use amount, id, price fields to create a new RexProgram
val usedFields = Array(2, 3, 1)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
index ec326c2a540..1ba968b3253 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeTestBase.scala
@@ -39,14 +39,16 @@ abstract class RexNodeTestBase {
val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(
Thread.currentThread().getContextClassLoader)
- val allFieldNames: java.util.List[String] = List("name", "id", "amount",
"price", "flag").asJava
+ val allFieldNames: java.util.List[String] =
+ List("name", "id", "amount", "price", "flag", "blob").asJava
val allFieldTypes: java.util.List[RelDataType] = List(
DataTypes.VARCHAR(100),
DataTypes.BIGINT(),
DataTypes.INT(),
DataTypes.DOUBLE(),
- DataTypes.BOOLEAN())
+ DataTypes.BOOLEAN(),
+ DataTypes.BYTES())
.map(LogicalTypeDataTypeConverter.fromDataTypeToLogicalType)
.map(typeFactory.createFieldTypeFromLogicalType)
.asJava