[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-7942:
-------------------------------
    Description: 
Test case *testFilterRule1* fails due to a NPE 
{code}
java.lang.RuntimeException: Error while applying rule 
FilterJoinRule:FilterJoinRule:filter, args 
[rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
 
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
 $3), 'c0')), 'c1'), 0)), 
rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
 $2),joinType=left)]

        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
        at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
        at 
org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
        testFilterRule1(FilterRuleTest.scala:63)        
Caused by: java.lang.NullPointerException
        at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
        at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
        at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
        at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
        at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
        at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
        at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
        at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}
but *testFilterRule2* works which has the same query written in SQL.

{code}
class FilterRuleTest extends TableTestBase {

  @Test
  def testFilterRule1(): Unit = {
    val util = batchTestUtil()
    val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
    val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
    val results = t1
      .leftOuterJoin(t2, 'b === 'e)
      .select('c, Merger('c, 'f) as 'c0)
      .select(Merger('c, 'c0) as 'c1)
      .where('c1 >= 0)

    val expected = unaryNode(
      "DataSetCalc",
      binaryNode(
        "DataSetJoin",
        unaryNode(
          "DataSetCalc",
          batchTableNode(0),
          term("select", "b", "c")
        ),
        unaryNode(
          "DataSetCalc",
          batchTableNode(1),
          term("select", "e", "f")
        ),
        term("where", "=(b, e)"),
        term("join", "b", "c", "e", "f"),
        term("joinType", "LeftOuterJoin")
      ),
      term("select", "Merger$(c, Merger$(c, f)) AS c1"),
      term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
    )

    util.verifyTable(results, expected)
  }

  @Test
  def testFilterRule2(): Unit = {
    val util = batchTestUtil()
    util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
    util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
    util.tableEnv.registerFunction("udf_test", Merger)

    val sql =
      s"""
         |select c1
         |from (
         |  select udf_test(c, c0) as c1
         |  from (
         |    select c, udf_test(b, c) as c0
         |      from
         |      (select a, b, c
         |        from T1
         |        left outer join T2
         |        on T1.b = T2.e
         |      ) tmp
         |  ) tmp1
         |) tmp2
         |where c1 >= 0
       """.stripMargin

    val results = util.tableEnv.sqlQuery(sql)
    val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" 
+
      "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
joinType=[LeftOuterJoin])\n" +
      "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
0)])\n" +
      "DataSetScan(table=[[_DataSetTable_0]])\n" +
      "DataSetCalc(select=[e])\n" +
      "DataSetScan(table=[[_DataSetTable_1]])"
    util.verifyTable(results, expected)
  }
}

object Merger extends ScalarFunction {
  def eval(f0: Int, f1: Int): Int = {
    f0 + f1
  }
}
{code}

A simple way to fix this is to change the calcite class {code} 
org.apache.calcite.plan.Strong{code}
add an additional entry to the EnumMap in createPolicyMap method:
{code}map.put(SqlKind.AS, Policy.AS_IS);{code}
Either copy to Flink package and modify it  or using reflection somewhere.

I'm not sure if there exists other issues like this one since not all the types 
in SQLKind included in the Strong.MAP.
@[~fhueske]  @[~twalthr] any ideas?

  was:
Test case *testFilterRule1* fails due to a NPE 
{code}
java.lang.RuntimeException: Error while applying rule 
FilterJoinRule:FilterJoinRule:filter, args 
[rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
 
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
 $3), 'c0')), 'c1'), 0)), 
rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
 $2),joinType=left)]

        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
        at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
        at 
org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
        at 
org.apache.flink.table.api.batch.table.FilterRuleITCase.testFilterRule1(FilterRuleTest.scala:63)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.NullPointerException
        at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
        at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
        at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
        at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
        at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
        at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
        at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
        at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}
but *testFilterRule2* works which has the same query written in SQL.

{code}
class FilterRuleTest extends TableTestBase {

  @Test
  def testFilterRule1(): Unit = {
    val util = batchTestUtil()
    val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
    val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
    val results = t1
      .leftOuterJoin(t2, 'b === 'e)
      .select('c, Merger('c, 'f) as 'c0)
      .select(Merger('c, 'c0) as 'c1)
      .where('c1 >= 0)

    val expected = unaryNode(
      "DataSetCalc",
      binaryNode(
        "DataSetJoin",
        unaryNode(
          "DataSetCalc",
          batchTableNode(0),
          term("select", "b", "c")
        ),
        unaryNode(
          "DataSetCalc",
          batchTableNode(1),
          term("select", "e", "f")
        ),
        term("where", "=(b, e)"),
        term("join", "b", "c", "e", "f"),
        term("joinType", "LeftOuterJoin")
      ),
      term("select", "Merger$(c, Merger$(c, f)) AS c1"),
      term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
    )

    util.verifyTable(results, expected)
  }

  @Test
  def testFilterRule2(): Unit = {
    val util = batchTestUtil()
    util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
    util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
    util.tableEnv.registerFunction("udf_test", Merger)

    val sql =
      s"""
         |select c1
         |from (
         |  select udf_test(c, c0) as c1
         |  from (
         |    select c, udf_test(b, c) as c0
         |      from
         |      (select a, b, c
         |        from T1
         |        left outer join T2
         |        on T1.b = T2.e
         |      ) tmp
         |  ) tmp1
         |) tmp2
         |where c1 >= 0
       """.stripMargin

    val results = util.tableEnv.sqlQuery(sql)
    val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" 
+
      "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
joinType=[LeftOuterJoin])\n" +
      "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
0)])\n" +
      "DataSetScan(table=[[_DataSetTable_0]])\n" +
      "DataSetCalc(select=[e])\n" +
      "DataSetScan(table=[[_DataSetTable_1]])"
    util.verifyTable(results, expected)
  }
}

object Merger extends ScalarFunction {
  def eval(f0: Int, f1: Int): Int = {
    f0 + f1
  }
}
{code}

A simple way to fix this is to change the calcite class {code} 
org.apache.calcite.plan.Strong{code}
add an additional entry to the EnumMap in createPolicyMap method:
{code}map.put(SqlKind.AS, Policy.AS_IS);{code}
Either copy to Flink package and modify it  or using reflection somewhere.

I'm not sure if there exists other issues like this one since not all the types 
in SQLKind included in the Strong.MAP.
@[~fhueske]  @[~twalthr] any ideas?


> NPE when apply FilterJoinRule
> -----------------------------
>
>                 Key: FLINK-7942
>                 URL: https://issues.apache.org/jira/browse/FLINK-7942
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>       at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>       testFilterRule1(FilterRuleTest.scala:63)        
> Caused by: java.lang.NullPointerException
>       at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>       at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>       at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>       at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>       at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>       at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>       at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>       at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
>     val util = batchTestUtil()
>     val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
>     val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
>     val results = t1
>       .leftOuterJoin(t2, 'b === 'e)
>       .select('c, Merger('c, 'f) as 'c0)
>       .select(Merger('c, 'c0) as 'c1)
>       .where('c1 >= 0)
>     val expected = unaryNode(
>       "DataSetCalc",
>       binaryNode(
>         "DataSetJoin",
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(0),
>           term("select", "b", "c")
>         ),
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(1),
>           term("select", "e", "f")
>         ),
>         term("where", "=(b, e)"),
>         term("join", "b", "c", "e", "f"),
>         term("joinType", "LeftOuterJoin")
>       ),
>       term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>       term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
>     )
>     util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
>     val util = batchTestUtil()
>     util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
>     util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
>     util.tableEnv.registerFunction("udf_test", Merger)
>     val sql =
>       s"""
>          |select c1
>          |from (
>          |  select udf_test(c, c0) as c1
>          |  from (
>          |    select c, udf_test(b, c) as c0
>          |      from
>          |      (select a, b, c
>          |        from T1
>          |        left outer join T2
>          |        on T1.b = T2.e
>          |      ) tmp
>          |  ) tmp1
>          |) tmp2
>          |where c1 >= 0
>        """.stripMargin
>     val results = util.tableEnv.sqlQuery(sql)
>     val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>       "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>       "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>       "DataSetScan(table=[[_DataSetTable_0]])\n" +
>       "DataSetCalc(select=[e])\n" +
>       "DataSetScan(table=[[_DataSetTable_1]])"
>     util.verifyTable(results, expected)
>   }
> }
> object Merger extends ScalarFunction {
>   def eval(f0: Int, f1: Int): Int = {
>     f0 + f1
>   }
> }
> {code}
> A simple way to fix this is to change the calcite class {code} 
> org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it  or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the 
> types in SQLKind included in the Strong.MAP.
> @[~fhueske]  @[~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to