Repository: flink Updated Branches: refs/heads/master db6528be0 -> 8e036c38e
[FLINK-3735] Make DataSetUnionRule match only for union-all This closes #1874 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e036c38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e036c38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e036c38 Branch: refs/heads/master Commit: 8e036c38e3fb7e48aa91a09debab3be6a24deefe Parents: db6528b Author: vasia <[email protected]> Authored: Tue Apr 12 15:11:54 2016 +0200 Committer: vasia <[email protected]> Committed: Wed Apr 13 12:36:04 2016 +0200 ---------------------------------------------------------------------- .../plan/rules/dataSet/DataSetUnionRule.scala | 34 ++++++++++++-------- 1 file changed, 21 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8e036c38/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala index 32400d0..cd1de1e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table.plan.rules.dataSet -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalUnion @@ -32,21 +32,29 @@ class DataSetUnionRule "FlinkUnionRule") { - def convert(rel: RelNode): RelNode = { + /** + * Only translate UNION ALL + */ + override def matches(call: RelOptRuleCall): Boolean = { + val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion] + union.all + } + + def convert(rel: RelNode): RelNode = { - val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) + val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) - new DataSetUnion( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType) - } + new DataSetUnion( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType) } +} object DataSetUnionRule { val INSTANCE: RelOptRule = new DataSetUnionRule
