This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f9dfa1833f2 [FLINK-36415][table] ProjectWatermarkAssignerTransposeRule 
should not generate extra casts
f9dfa1833f2 is described below

commit f9dfa1833f2c37d155cb8180ed4f4aec182ca537
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Oct 1 16:51:46 2024 +0200

    [FLINK-36415][table] ProjectWatermarkAssignerTransposeRule should not 
generate extra casts
    
    
    ---------
    
    Co-authored-by: Dawid Wysakowicz <[email protected]>
---
 .../planner/plan/utils/NestedProjectionUtil.scala  | 24 ++++++++++++++++++----
 .../PushProjectIntoTableSourceScanRuleTest.java    |  9 ++++++++
 .../PushProjectIntoTableSourceScanRuleTest.xml     | 17 +++++++++++++++
 3 files changed, 46 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
index d5123779f45..582ace03b2d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtil.scala
@@ -22,6 +22,7 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushD
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
 
 import java.util.{LinkedHashMap => JLinkedHashMap, LinkedList => JLinkedList, 
List => JList}
 
@@ -222,10 +223,10 @@ private class NestedSchemaRewriter(schema: NestedSchema, 
builder: RexBuilder) ex
         if (parent.isLeaf) {
           (
             Some(
-              builder.makeFieldAccess(
+              copyFieldAccess(
                 new RexInputRef(parent.indexOfLeafInNewSchema, 
parent.originFieldType),
                 fieldAccess.getField.getName,
-                true)),
+                builder)),
             parent)
         } else {
           val child = parent.children.get(fieldAccess.getField.getName)
@@ -238,7 +239,7 @@ private class NestedSchemaRewriter(schema: NestedSchema, 
builder: RexBuilder) ex
       case acc: RexFieldAccess =>
         val (field, parent) = traverse(acc)
         if (field.isDefined) {
-          (Some(builder.makeFieldAccess(field.get, 
fieldAccess.getField.getName, true)), parent)
+          (Some(copyFieldAccess(field.get, fieldAccess.getField.getName, 
builder)), parent)
         } else {
           val child = parent.children.get(fieldAccess.getField.getName)
           if (child.isLeaf) {
@@ -251,7 +252,22 @@ private class NestedSchemaRewriter(schema: NestedSchema, 
builder: RexBuilder) ex
         // rewrite operands of the expression
         val newExpr = expr.accept(this)
         // rebuild FieldAccess
-        (Some(builder.makeFieldAccess(newExpr, fieldAccess.getField.getName, 
true)), null)
+        (Some(copyFieldAccess(newExpr, fieldAccess.getField.getName, 
builder)), null)
+    }
+  }
+
+  // Extra CASTs should be avoided since only need to copy FieldAccess
+  private def copyFieldAccess(
+      newExpr: RexNode,
+      fieldName: String,
+      rexBuilder: RexBuilder): RexNode = {
+    // rebuild fieldAccess
+    val fieldAccess = rexBuilder.makeFieldAccess(newExpr, fieldName, true)
+    fieldAccess match {
+      case call: RexCall =>
+        call.getOperands.get(0)
+      case _ =>
+        fieldAccess
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
index 2a6c770cd66..55cbdddcb35 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
@@ -175,6 +175,10 @@ class PushProjectIntoTableSourceScanRuleTest extends 
TableTestBase {
                         + "    `data_map` MAP<STRING, ROW<`value` BIGINT>>>,\n"
                         + "  `outer_array` ARRAY<INT>,\n"
                         + "  `outer_map` MAP<STRING, STRING>,\n"
+                        + "  `chart` ROW<"
+                        + "    `result` ARRAY<ROW<`meta` ROW<"
+                        + "                         `symbol` STRING NOT NULL> 
NOT NULL> NOT NULL>"
+                        + "     NOT NULL>,\n"
                         + "   WATERMARK FOR `Timestamp` AS `Timestamp`\n"
                         + ") WITH (\n"
                         + " 'connector' = 'values',\n"
@@ -297,6 +301,11 @@ class PushProjectIntoTableSourceScanRuleTest extends 
TableTestBase {
                         + "FROM NestedItemTable");
     }
 
+    @Test
+    void testNestedProjectFieldAccessWithNestedArrayAndRows() {
+        util.verifyRelPlan("SELECT `chart`.`result`[1].`meta`.`symbol` FROM 
ItemTable");
+    }
+
     @Test
     void testNestedProjectFieldAccessWithITEMContainsTopLevelAccess() {
         util.verifyRelPlan(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
index 3c90cf08bcf..2550377ba69 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
@@ -186,6 +186,23 @@ LogicalProject(EXPR$0=[ITEM($0, $2).value], 
EXPR$1=[ITEM($1, _UTF-16LE'item').va
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testNestedProjectFieldAccessWithNestedArrayAndRows">
+               <Resource name="sql">
+                       <![CDATA[SELECT `chart`.`result`[1].`meta`.`symbol` 
FROM ItemTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[CAST(CAST(ITEM($5.result, 
1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, ItemTable]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+LogicalProject(EXPR$0=[CAST(CAST(ITEM($0.result, 
1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
++- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, 
project=[chart], metadata=[]]])
+]]>
+               </Resource>
+       </TestCase>
   <TestCase name="testNestedProjectFieldAccessWithITEMContainsTopLevelAccess">
     <Resource name="sql">
       <![CDATA[SELECT `Result`.`Mid`.data_arr[2].`value`, 
`Result`.`Mid`.data_arr[ID].`value`, `Result`.`Mid`.data_map['item'].`value`, 
`Result`.`Mid` FROM NestedItemTable]]>

Reply via email to