Repository: flink Updated Branches: refs/heads/master 691c48a14 -> 0a22acef4
[FLINK-8095] [table] Introduce ProjectSetOpTransposeRule This closes #5026. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a22acef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a22acef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a22acef Branch: refs/heads/master Commit: 0a22acef41ede452b3df1a9874b5ae4a336d8a77 Parents: 691c48a Author: Xpray <[email protected]> Authored: Fri Nov 17 11:01:27 2017 +0800 Committer: twalthr <[email protected]> Committed: Mon Nov 20 12:49:13 2017 +0100 ---------------------------------------------------------------------- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 58 ++++++++++++++++++++ .../api/stream/table/SetOperatorsTest.scala | 29 ++++++++++ .../plan/TimeIndicatorConversionTest.scala | 16 ++++-- 4 files changed, 99 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index a20d14f..10d6881 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -54,6 +54,8 @@ object FlinkRuleSets { FilterAggregateTransposeRule.INSTANCE, // push filter through set operation FilterSetOpTransposeRule.INSTANCE, + // push project through set operation + ProjectSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 35f4429..929ce9c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -215,4 +215,62 @@ class SetOperatorsTest extends TableTestBase { util.verifyTable(result, expected) } + + @Test + def testProjectUnionTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.select('a, 'b, 'c) + .unionAll(right.select('a, 'b, 'c)) + .select('b, 'c) + + val expected = binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "b", "c") + ), + term("union", "b", "c") + ) + + util.verifyTable(result, expected) + + } + + @Test + def testProjectMinusTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.select('a, 'b, 'c) + .minusAll(right.select('a, 'b, 'c)) + .select('b, 'c) + + val expected = binaryNode( + "DataSetMinus", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "b", "c") + ), + term("minus", "b", "c") + ) + + util.verifyTable(result, expected) + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala index b1b700b..c0fc05b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala @@ -65,4 +65,33 @@ class SetOperatorsTest extends TableTestBase { util.verifyTable(result, expected) } + + @Test + def testProjectUnionTranspose(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.select('a, 'b, 'c) + .unionAll(right.select('a, 'b, 'c)) + .select('b, 'c) + + val expected = binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "b", "c") + ), + term("union all", "b", "c") + ) + + util.verifyTable(result, expected) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 009ae40..faca7f9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -209,15 +209,19 @@ class TimeIndicatorConversionTest extends TableTestBase { val result = t.unionAll(t).select('rowtime) - val expected = unaryNode( - "DataStreamCalc", - binaryNode( - "DataStreamUnion", + val expected = binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", streamTableNode(0), + term("select", "rowtime") + ), + unaryNode( + "DataStreamCalc", streamTableNode(0), - term("union all", "rowtime", "long", "int") + term("select", "rowtime") ), - term("select", "rowtime") + term("union all", "rowtime") ) util.verifyTable(result, expected)
