[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253293#comment-16253293
 ] 

ASF GitHub Bot commented on FLINK-7942:
---------------------------------------

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/5019

    [FLINK-7942] [table] Reduce aliasing in RexNodes

    ## What is the purpose of the change
    
    This PR reduces the number of `AS` expressions in projects. This fixes the 
bug mentioned in FLINK-7942 with the `FilterJoinRule` and improves the plans. 
Many calc operations are not needed anymore. For multi-windows, the change 
caused some problems that are not trivial to fix, so they still use aliasing 
for resolving time attributes.
    
    ## Brief change log
    
    - Remove explicit alias expression from RexNodes
    
    
    ## Verifying this change
    
    - Added testFilterJoinRule in JoinTest.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-7942

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5019
    
----
commit 82356f22154c563fcd8bd018dd954fe274e7fe5f
Author: twalthr <twal...@apache.org>
Date:   2017-11-15T11:07:16Z

    [FLINK-7942] [table] Reduce aliasing in RexNodes

----


> NPE when apply FilterJoinRule
> -----------------------------
>
>                 Key: FLINK-7942
>                 URL: https://issues.apache.org/jira/browse/FLINK-7942
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>       at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>       testFilterRule1(FilterRuleTest.scala:63)        
> Caused by: java.lang.NullPointerException
>       at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>       at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>       at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>       at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>       at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>       at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>       at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>       at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
>     val util = batchTestUtil()
>     val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
>     val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
>     val results = t1
>       .leftOuterJoin(t2, 'b === 'e)
>       .select('c, Merger('c, 'f) as 'c0)
>       .select(Merger('c, 'c0) as 'c1)
>       .where('c1 >= 0)
>     val expected = unaryNode(
>       "DataSetCalc",
>       binaryNode(
>         "DataSetJoin",
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(0),
>           term("select", "b", "c")
>         ),
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(1),
>           term("select", "e", "f")
>         ),
>         term("where", "=(b, e)"),
>         term("join", "b", "c", "e", "f"),
>         term("joinType", "LeftOuterJoin")
>       ),
>       term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>       term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
>     )
>     util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
>     val util = batchTestUtil()
>     util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
>     util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
>     util.tableEnv.registerFunction("udf_test", Merger)
>     val sql =
>       s"""
>          |select c1
>          |from (
>          |  select udf_test(c, c0) as c1
>          |  from (
>          |    select c, udf_test(b, c) as c0
>          |      from
>          |      (select a, b, c
>          |        from T1
>          |        left outer join T2
>          |        on T1.b = T2.e
>          |      ) tmp
>          |  ) tmp1
>          |) tmp2
>          |where c1 >= 0
>        """.stripMargin
>     val results = util.tableEnv.sqlQuery(sql)
>     val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>       "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>       "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>       "DataSetScan(table=[[_DataSetTable_0]])\n" +
>       "DataSetCalc(select=[e])\n" +
>       "DataSetScan(table=[[_DataSetTable_1]])"
>     util.verifyTable(results, expected)
>   }
> }
> object Merger extends ScalarFunction {
>   def eval(f0: Int, f1: Int): Int = {
>     f0 + f1
>   }
> }
> {code}
> A simple way to fix this is to change the calcite class {code} 
> org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it  or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the 
> types in SQLKind included in the Strong.MAP.
> @[~fhueske]  @[~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to