This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f9dfa1833f2 [FLINK-36415][table] ProjectWatermarkAssignerTransposeRule
should not generate extra casts
f9dfa1833f2 is described below
commit f9dfa1833f2c37d155cb8180ed4f4aec182ca537
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Oct 1 16:51:46 2024 +0200
[FLINK-36415][table] ProjectWatermarkAssignerTransposeRule should not
generate extra casts
---------
Co-authored-by: Dawid Wysakowicz <[email protected]>
---
.../planner/plan/utils/NestedProjectionUtil.scala | 24 ++++++++++++++++++----
.../PushProjectIntoTableSourceScanRuleTest.java | 9 ++++++++
.../PushProjectIntoTableSourceScanRuleTest.xml | 17 +++++++++++++++
3 files changed, 46 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
index d5123779f45..582ace03b2d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
@@ -22,6 +22,7 @@ import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushD
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
import java.util.{LinkedHashMap => JLinkedHashMap, LinkedList => JLinkedList,
List => JList}
@@ -222,10 +223,10 @@ private class NestedSchemaRewriter(schema: NestedSchema,
builder: RexBuilder) ex
if (parent.isLeaf) {
(
Some(
- builder.makeFieldAccess(
+ copyFieldAccess(
new RexInputRef(parent.indexOfLeafInNewSchema,
parent.originFieldType),
fieldAccess.getField.getName,
- true)),
+ builder)),
parent)
} else {
val child = parent.children.get(fieldAccess.getField.getName)
@@ -238,7 +239,7 @@ private class NestedSchemaRewriter(schema: NestedSchema,
builder: RexBuilder) ex
case acc: RexFieldAccess =>
val (field, parent) = traverse(acc)
if (field.isDefined) {
- (Some(builder.makeFieldAccess(field.get,
fieldAccess.getField.getName, true)), parent)
+ (Some(copyFieldAccess(field.get, fieldAccess.getField.getName,
builder)), parent)
} else {
val child = parent.children.get(fieldAccess.getField.getName)
if (child.isLeaf) {
@@ -251,7 +252,22 @@ private class NestedSchemaRewriter(schema: NestedSchema,
builder: RexBuilder) ex
// rewrite operands of the expression
val newExpr = expr.accept(this)
// rebuild FieldAccess
- (Some(builder.makeFieldAccess(newExpr, fieldAccess.getField.getName,
true)), null)
+ (Some(copyFieldAccess(newExpr, fieldAccess.getField.getName,
builder)), null)
+ }
+ }
+
+ // Extra CASTs should be avoided since only need to copy FieldAccess
+ private def copyFieldAccess(
+ newExpr: RexNode,
+ fieldName: String,
+ rexBuilder: RexBuilder): RexNode = {
+ // rebuild fieldAccess
+ val fieldAccess = rexBuilder.makeFieldAccess(newExpr, fieldName, true)
+ fieldAccess match {
+ case call: RexCall =>
+ call.getOperands.get(0)
+ case _ =>
+ fieldAccess
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index 2a6c770cd66..55cbdddcb35 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -175,6 +175,10 @@ class PushProjectIntoTableSourceScanRuleTest extends
TableTestBase {
+ " `data_map` MAP<STRING, ROW<`value` BIGINT>>>,\n"
+ " `outer_array` ARRAY<INT>,\n"
+ " `outer_map` MAP<STRING, STRING>,\n"
+ + " `chart` ROW<"
+ + " `result` ARRAY<ROW<`meta` ROW<"
+ + " `symbol` STRING NOT NULL>
NOT NULL> NOT NULL>"
+ + " NOT NULL>,\n"
+ " WATERMARK FOR `Timestamp` AS `Timestamp`\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
@@ -297,6 +301,11 @@ class PushProjectIntoTableSourceScanRuleTest extends
TableTestBase {
+ "FROM NestedItemTable");
}
+ @Test
+ void testNestedProjectFieldAccessWithNestedArrayAndRows() {
+ util.verifyRelPlan("SELECT `chart`.`result`[1].`meta`.`symbol` FROM
ItemTable");
+ }
+
@Test
void testNestedProjectFieldAccessWithITEMContainsTopLevelAccess() {
util.verifyRelPlan(
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
index 3c90cf08bcf..2550377ba69 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
@@ -186,6 +186,23 @@ LogicalProject(EXPR$0=[ITEM($0, $2).value],
EXPR$1=[ITEM($1, _UTF-16LE'item').va
]]>
</Resource>
</TestCase>
+ <TestCase name="testNestedProjectFieldAccessWithNestedArrayAndRows">
+ <Resource name="sql">
+ <![CDATA[SELECT `chart`.`result`[1].`meta`.`symbol`
FROM ItemTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[CAST(CAST(ITEM($5.result,
1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, ItemTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(EXPR$0=[CAST(CAST(ITEM($0.result,
1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, ItemTable,
project=[chart], metadata=[]]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testNestedProjectFieldAccessWithITEMContainsTopLevelAccess">
<Resource name="sql">
<![CDATA[SELECT `Result`.`Mid`.data_arr[2].`value`,
`Result`.`Mid`.data_arr[ID].`value`, `Result`.`Mid`.data_map['item'].`value`,
`Result`.`Mid` FROM NestedItemTable]]>