[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235700#comment-16235700
 ] 

Fabian Hueske commented on FLINK-7942:
--------------------------------------

Julian's response on CALCITE-2023 suggests to generate {{RexNode}} expressions 
without {{AS}} nodes. 
Maybe we can fix this in the Table API {{Expression}} -> {{RexNode}} 
translation.

> NPE when apply FilterJoinRule
> -----------------------------
>
>                 Key: FLINK-7942
>                 URL: https://issues.apache.org/jira/browse/FLINK-7942
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>            Priority: Major
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>       at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>       testFilterRule1(FilterRuleTest.scala:63)        
> Caused by: java.lang.NullPointerException
>       at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>       at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>       at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>       at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>       at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>       at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>       at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>       at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
>     val util = batchTestUtil()
>     val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
>     val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
>     val results = t1
>       .leftOuterJoin(t2, 'b === 'e)
>       .select('c, Merger('c, 'f) as 'c0)
>       .select(Merger('c, 'c0) as 'c1)
>       .where('c1 >= 0)
>     val expected = unaryNode(
>       "DataSetCalc",
>       binaryNode(
>         "DataSetJoin",
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(0),
>           term("select", "b", "c")
>         ),
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(1),
>           term("select", "e", "f")
>         ),
>         term("where", "=(b, e)"),
>         term("join", "b", "c", "e", "f"),
>         term("joinType", "LeftOuterJoin")
>       ),
>       term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>       term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
>     )
>     util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
>     val util = batchTestUtil()
>     util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
>     util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
>     util.tableEnv.registerFunction("udf_test", Merger)
>     val sql =
>       s"""
>          |select c1
>          |from (
>          |  select udf_test(c, c0) as c1
>          |  from (
>          |    select c, udf_test(b, c) as c0
>          |      from
>          |      (select a, b, c
>          |        from T1
>          |        left outer join T2
>          |        on T1.b = T2.e
>          |      ) tmp
>          |  ) tmp1
>          |) tmp2
>          |where c1 >= 0
>        """.stripMargin
>     val results = util.tableEnv.sqlQuery(sql)
>     val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>       "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>       "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>       "DataSetScan(table=[[_DataSetTable_0]])\n" +
>       "DataSetCalc(select=[e])\n" +
>       "DataSetScan(table=[[_DataSetTable_1]])"
>     util.verifyTable(results, expected)
>   }
> }
> object Merger extends ScalarFunction {
>   def eval(f0: Int, f1: Int): Int = {
>     f0 + f1
>   }
> }
> {code}
> A simple way to fix this is to change the calcite class {code} 
> org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it  or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the 
> types in SQLKind included in the Strong.MAP.
> @[~fhueske]  @[~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to