[
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln.lee updated FLINK-7942:
-------------------------------
Description:
Test case *testFilterRule1* fails due to a NPE
{code}
java.lang.RuntimeException: Error while applying rule
FilterJoinRule:FilterJoinRule:filter, args
[rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
$3), 'c0')), 'c1'), 0)),
rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
$2),joinType=left)]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
at
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
at
org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
testFilterRule1(FilterRuleTest.scala:63)
Caused by: java.lang.NullPointerException
at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
at
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
at
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}
but *testFilterRule2* works which has the same query written in SQL.
{code}
class FilterRuleTest extends TableTestBase {
@Test
def testFilterRule1(): Unit = {
val util = batchTestUtil()
val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
val results = t1
.leftOuterJoin(t2, 'b === 'e)
.select('c, Merger('c, 'f) as 'c0)
.select(Merger('c, 'c0) as 'c1)
.where('c1 >= 0)
val expected = unaryNode(
"DataSetCalc",
binaryNode(
"DataSetJoin",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "b", "c")
),
unaryNode(
"DataSetCalc",
batchTableNode(1),
term("select", "e", "f")
),
term("where", "=(b, e)"),
term("join", "b", "c", "e", "f"),
term("joinType", "LeftOuterJoin")
),
term("select", "Merger$(c, Merger$(c, f)) AS c1"),
term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
)
util.verifyTable(results, expected)
}
@Test
def testFilterRule2(): Unit = {
val util = batchTestUtil()
util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
util.tableEnv.registerFunction("udf_test", Merger)
val sql =
s"""
|select c1
|from (
| select udf_test(c, c0) as c1
| from (
| select c, udf_test(b, c) as c0
| from
| (select a, b, c
| from T1
| left outer join T2
| on T1.b = T2.e
| ) tmp
| ) tmp1
|) tmp2
|where c1 >= 0
""".stripMargin
val results = util.tableEnv.sqlQuery(sql)
val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n"
+
"DataSetJoin(where=[=(b, e)], join=[b, c, e],
joinType=[LeftOuterJoin])\n" +
"DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)),
0)])\n" +
"DataSetScan(table=[[_DataSetTable_0]])\n" +
"DataSetCalc(select=[e])\n" +
"DataSetScan(table=[[_DataSetTable_1]])"
util.verifyTable(results, expected)
}
}
object Merger extends ScalarFunction {
def eval(f0: Int, f1: Int): Int = {
f0 + f1
}
}
{code}
A simple way to fix this is to change the calcite class {code}
org.apache.calcite.plan.Strong{code}
add an additional entry to the EnumMap in createPolicyMap method:
{code}map.put(SqlKind.AS, Policy.AS_IS);{code}
Either copy to Flink package and modify it or using reflection somewhere.
I'm not sure if there exists other issues like this one since not all the types
in SQLKind included in the Strong.MAP.
@[~fhueske] @[~twalthr] any ideas?
was:
Test case *testFilterRule1* fails due to a NPE
{code}
java.lang.RuntimeException: Error while applying rule
FilterJoinRule:FilterJoinRule:filter, args
[rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
$3), 'c0')), 'c1'), 0)),
rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
$2),joinType=left)]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
at
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
at
org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
at
org.apache.flink.table.api.batch.table.FilterRuleITCase.testFilterRule1(FilterRuleTest.scala:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.NullPointerException
at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
at
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
at
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}
but *testFilterRule2* works which has the same query written in SQL.
{code}
class FilterRuleTest extends TableTestBase {
@Test
def testFilterRule1(): Unit = {
val util = batchTestUtil()
val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
val results = t1
.leftOuterJoin(t2, 'b === 'e)
.select('c, Merger('c, 'f) as 'c0)
.select(Merger('c, 'c0) as 'c1)
.where('c1 >= 0)
val expected = unaryNode(
"DataSetCalc",
binaryNode(
"DataSetJoin",
unaryNode(
"DataSetCalc",
batchTableNode(0),
term("select", "b", "c")
),
unaryNode(
"DataSetCalc",
batchTableNode(1),
term("select", "e", "f")
),
term("where", "=(b, e)"),
term("join", "b", "c", "e", "f"),
term("joinType", "LeftOuterJoin")
),
term("select", "Merger$(c, Merger$(c, f)) AS c1"),
term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
)
util.verifyTable(results, expected)
}
@Test
def testFilterRule2(): Unit = {
val util = batchTestUtil()
util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
util.tableEnv.registerFunction("udf_test", Merger)
val sql =
s"""
|select c1
|from (
| select udf_test(c, c0) as c1
| from (
| select c, udf_test(b, c) as c0
| from
| (select a, b, c
| from T1
| left outer join T2
| on T1.b = T2.e
| ) tmp
| ) tmp1
|) tmp2
|where c1 >= 0
""".stripMargin
val results = util.tableEnv.sqlQuery(sql)
val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n"
+
"DataSetJoin(where=[=(b, e)], join=[b, c, e],
joinType=[LeftOuterJoin])\n" +
"DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)),
0)])\n" +
"DataSetScan(table=[[_DataSetTable_0]])\n" +
"DataSetCalc(select=[e])\n" +
"DataSetScan(table=[[_DataSetTable_1]])"
util.verifyTable(results, expected)
}
}
object Merger extends ScalarFunction {
def eval(f0: Int, f1: Int): Int = {
f0 + f1
}
}
{code}
A simple way to fix this is to change the calcite class {code}
org.apache.calcite.plan.Strong{code}
add an additional entry to the EnumMap in createPolicyMap method:
{code}map.put(SqlKind.AS, Policy.AS_IS);{code}
Either copy to Flink package and modify it or using reflection somewhere.
I'm not sure if there exists other issues like this one since not all the types
in SQLKind included in the Strong.MAP.
@[~fhueske] @[~twalthr] any ideas?
> NPE when apply FilterJoinRule
> -----------------------------
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: lincoln.lee
> Assignee: lincoln.lee
>
> Test case *testFilterRule1* fails due to a NPE
> {code}
> java.lang.RuntimeException: Error while applying rule
> FilterJoinRule:FilterJoinRule:filter, args
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
> $3), 'c0')), 'c1'), 0)),
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
> $2),joinType=left)]
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
> at
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
> at
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
> testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
> at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
> at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
> at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
> at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
> at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
> at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
> at
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
> at
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
> @Test
> def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
> .leftOuterJoin(t2, 'b === 'e)
> .select('c, Merger('c, 'f) as 'c0)
> .select(Merger('c, 'c0) as 'c1)
> .where('c1 >= 0)
> val expected = unaryNode(
> "DataSetCalc",
> binaryNode(
> "DataSetJoin",
> unaryNode(
> "DataSetCalc",
> batchTableNode(0),
> term("select", "b", "c")
> ),
> unaryNode(
> "DataSetCalc",
> batchTableNode(1),
> term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
> ),
> term("select", "Merger$(c, Merger$(c, f)) AS c1"),
> term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
> }
> @Test
> def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
> s"""
> |select c1
> |from (
> | select udf_test(c, c0) as c1
> | from (
> | select c, udf_test(b, c) as c0
> | from
> | (select a, b, c
> | from T1
> | left outer join T2
> | on T1.b = T2.e
> | ) tmp
> | ) tmp1
> |) tmp2
> |where c1 >= 0
> """.stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1])
> \n" +
> "DataSetJoin(where=[=(b, e)], join=[b, c, e],
> joinType=[LeftOuterJoin])\n" +
> "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)),
> 0)])\n" +
> "DataSetScan(table=[[_DataSetTable_0]])\n" +
> "DataSetCalc(select=[e])\n" +
> "DataSetScan(table=[[_DataSetTable_1]])"
> util.verifyTable(results, expected)
> }
> }
> object Merger extends ScalarFunction {
> def eval(f0: Int, f1: Int): Int = {
> f0 + f1
> }
> }
> {code}
> A simple way to fix this is to change the calcite class {code}
> org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the
> types in SQLKind included in the Strong.MAP.
> @[~fhueske] @[~twalthr] any ideas?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)