[FLINK-3596] Finalized DataSet RelNode refactoring
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d720b002 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d720b002 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d720b002 Branch: refs/heads/tableOnCalcite Commit: d720b002a77d4ac3a2e41bfaeaf6ad87c2e415c0 Parents: 22621e0 Author: Fabian Hueske <[email protected]> Authored: Thu Mar 10 23:30:06 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Thu Mar 10 23:30:43 2016 +0100 ---------------------------------------------------------------------- .../plan/nodes/dataset/DataSetAggregate.scala | 7 +- .../table/plan/nodes/dataset/DataSetUnion.scala | 2 +- .../api/table/plan/rules/FlinkRuleSets.scala | 12 +- .../rules/dataSet/DataSetAggregateRule.scala | 55 +++++++ .../plan/rules/dataSet/DataSetCalcRule.scala | 53 ++++++ .../plan/rules/dataSet/DataSetJoinRule.scala | 115 +++++++++++++ .../plan/rules/dataSet/DataSetScanRule.scala | 50 ++++++ .../plan/rules/dataSet/DataSetUnionRule.scala | 54 ++++++ .../dataSet/FlinkJoinUnionTransposeRule.scala | 110 +++++++++++++ .../plan/rules/logical/FlinkAggregateRule.scala | 55 ------- .../plan/rules/logical/FlinkCalcRule.scala | 53 ------ .../plan/rules/logical/FlinkJoinRule.scala | 115 ------------- .../logical/FlinkJoinUnionTransposeRule.scala | 110 ------------- .../plan/rules/logical/FlinkScanRule.scala | 52 ------ .../plan/rules/logical/FlinkUnionRule.scala | 54 ------ .../table/runtime/aggregate/AggregateUtil.scala | 11 +- .../test/GroupedAggregationsITCase.scala.orig | 164 ------------------- 17 files changed, 450 insertions(+), 622 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 3856c5f..9a9bf99 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -24,7 +24,6 @@ import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.plan.TypeConverter._ import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter} import org.apache.flink.api.table.runtime.aggregate.AggregateUtil import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair @@ -76,7 +75,7 @@ class DataSetAggregate( case _ => // ok } - val groupingKeys = (0 until grouping.length).toArray + val groupingKeys = grouping.indices.toArray // add grouping fields, position keys in the input, and input type val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, inputType, rowType, grouping, config) @@ -93,8 +92,8 @@ class DataSetAggregate( .toArray val rowTypeInfo = new RowTypeInfo(fieldTypes) - val mappedInput = inputDS.map(aggregateResult.mapFunc) - val groupReduceFunction = aggregateResult.reduceGroupFunc + val mappedInput = inputDS.map(aggregateResult._1) + val groupReduceFunction = aggregateResult._2 if (groupingKeys.length > 0) { mappedInput.asInstanceOf[DataSet[Row]] http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala index 462c4a5..a52a65e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.{TableConfig, Row} +import org.apache.flink.api.table.TableConfig /** * Flink RelNode which matches along with UnionOperator. http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index bd128b2..d8cc2b3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.table.plan.rules import org.apache.calcite.rel.rules._ import org.apache.calcite.tools.{RuleSets, RuleSet} -import org.apache.flink.api.table.plan.rules.logical._ +import org.apache.flink.api.table.plan.rules.dataSet._ object FlinkRuleSets { @@ -94,11 +94,11 @@ object FlinkRuleSets { CalcMergeRule.INSTANCE, // translate to logical Flink nodes - FlinkAggregateRule.INSTANCE, - FlinkCalcRule.INSTANCE, - FlinkJoinRule.INSTANCE, - FlinkScanRule.INSTANCE, - FlinkUnionRule.INSTANCE + DataSetAggregateRule.INSTANCE, + DataSetCalcRule.INSTANCE, + DataSetJoinRule.INSTANCE, + DataSetScanRule.INSTANCE, + DataSetUnionRule.INSTANCE ) } http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala new file mode 100644 index 0000000..40afd4a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} +import scala.collection.JavaConversions._ + +class DataSetAggregateRule + extends ConverterRule( + classOf[LogicalAggregate], + Convention.NONE, + DataSetConvention.INSTANCE, + "FlinkAggregateRule") + { + + def convert(rel: RelNode): RelNode = { + val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) + + new DataSetAggregate( + rel.getCluster, + traitSet, + convInput, + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + agg.toString, + agg.getGroupSet.toArray) + } + } + +object DataSetAggregateRule { + val INSTANCE: RelOptRule = new DataSetAggregateRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala new file mode 100644 index 0000000..3610819 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalCalc +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} + +class DataSetCalcRule + extends ConverterRule( + classOf[LogicalCalc], + Convention.NONE, + DataSetConvention.INSTANCE, + "FlinkCalcRule") + { + + def convert(rel: RelNode): RelNode = { + val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE) + + new DataSetCalc( + rel.getCluster, + traitSet, + convInput, + rel.getRowType, + calc.getProgram, + calc.toString, + description) + } + } + +object DataSetCalcRule { + val INSTANCE: RelOptRule = new DataSetCalcRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala new file mode 100644 index 0000000..06f7d51 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rex.{RexInputRef, RexCall} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention} +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + +class DataSetJoinRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "FlinkJoinRule") + { + + override def matches(call: RelOptRuleCall): Boolean = { + + val join = call.rel(0).asInstanceOf[LogicalJoin] + val children = join.getInputs + val rexBuilder = call.builder().getRexBuilder + + val joinInfo = join.analyzeCondition() + val joinCondition = join.getCondition + val equiCondition = + joinInfo.getEquiCondition(children.get(0), children.get(1), rexBuilder) + + // joins require at least one equi-condition + if (equiCondition.isAlwaysTrue) { + false + } + else { + // check that all equality predicates refer to field refs only (not computed expressions) + // Note: Calcite treats equality predicates on expressions as non-equi predicates + joinCondition match { + + // conjunction of join predicates + case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.AND) => + + c.getOperands.asScala + // look at equality predicates only + .filter { o => + o.isInstanceOf[RexCall] && + o.asInstanceOf[RexCall].getOperator.equals(SqlStdOperatorTable.EQUALS) + } + // check that both children are field references + .map { o => + o.asInstanceOf[RexCall].getOperands.get(0).isInstanceOf[RexInputRef] && + o.asInstanceOf[RexCall].getOperands.get(1).isInstanceOf[RexInputRef] + } + // any equality predicate that does not refer to a field reference? + .reduce( (a, b) => a && b) + + // single equi-join predicate + case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.EQUALS) => + c.getOperands.get(0).isInstanceOf[RexInputRef] && + c.getOperands.get(1).isInstanceOf[RexInputRef] + case _ => + false + } + } + + } + + def convert(rel: RelNode): RelNode = { + + val join: LogicalJoin = rel.asInstanceOf[LogicalJoin] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) + val joinInfo = join.analyzeCondition + + new DataSetJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + join.toString, + join.getCondition, + join.getRowType, + joinInfo, + joinInfo.pairs.toList, + JoinType.INNER, + null, + description) + } + } + +object DataSetJoinRule { + val INSTANCE: RelOptRule = new DataSetJoinRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala new file mode 100644 index 0000000..2865d9f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} + +class DataSetScanRule + extends ConverterRule( + classOf[LogicalTableScan], + Convention.NONE, + DataSetConvention.INSTANCE, + "FlinkScanRule") + { + def convert(rel: RelNode): RelNode = { + val scan: TableScan = rel.asInstanceOf[TableScan] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + + new DataSetSource( + rel.getCluster, + traitSet, + scan.getTable, + rel.getRowType + ) + } + } + +object DataSetScanRule { + val INSTANCE: RelOptRule = new DataSetScanRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala new file mode 100644 index 0000000..6ab64c6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalUnion +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} + +class DataSetUnionRule + extends ConverterRule( + classOf[LogicalUnion], + Convention.NONE, + DataSetConvention.INSTANCE, + "FlinkUnionRule") + { + + def convert(rel: RelNode): RelNode = { + + val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) + + new DataSetUnion( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + union.toString) + } + } + +object DataSetUnionRule { + val INSTANCE: RelOptRule = new DataSetUnionRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala new file mode 100644 index 0000000..32b53e2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/FlinkJoinUnionTransposeRule.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule.{any, operand, convert => convertTrait} +import org.apache.calcite.plan.RelOptRule +import org.apache.calcite.plan.RelOptRuleOperand +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.RelNode +import java.util.ArrayList +import scala.collection.JavaConversions._ +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.logical.LogicalUnion +import org.apache.calcite.rel.core.Join +import org.apache.calcite.rel.core.Union + +/** + * This rule is a copy of Calcite's JoinUnionTransposeRule. + * Calcite's implementation checks whether one of the operands is a LogicalUnion, + * which fails in our case, when it matches with a DataSetUnion. + * This rule changes this check to match Union, instead of LogicalUnion only. + * The rest of the rule's logic has not been changed. + */ +class FlinkJoinUnionTransposeRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def onMatch(call: RelOptRuleCall): Unit = { + val join = call.rel(0).asInstanceOf[Join] + val (unionRel: Union, otherInput: RelNode, unionOnLeft: Boolean) = { + if (call.rel(1).isInstanceOf[Union]) { + (call.rel(1).asInstanceOf[Union], call.rel(2).asInstanceOf[RelNode], true) + } + else { + (call.rel(2).asInstanceOf[Union], call.rel(1).asInstanceOf[RelNode], false) + } + } + + if (!unionRel.all) { + return + } + if (!join.getVariablesStopped.isEmpty) { + return + } + // The UNION ALL cannot be on the null generating side + // of an outer join (otherwise we might generate incorrect + // rows for the other side for join keys which lack a match + // in one or both branches of the union) + if (unionOnLeft) { + if (join.getJoinType.generatesNullsOnLeft) { + return + } + } + else { + if (join.getJoinType.generatesNullsOnRight) { + return + } + } + val newUnionInputs = new ArrayList[RelNode] + for (input <- unionRel.getInputs) { + val (joinLeft: RelNode, joinRight: RelNode) = { + if (unionOnLeft) { + (input, otherInput) + } + else { + (otherInput, input) + } + } + + newUnionInputs.add( + join.copy( + join.getTraitSet, + join.getCondition, + joinLeft, + joinRight, + join.getJoinType, + join.isSemiJoinDone)) + } + val newUnionRel = unionRel.copy(unionRel.getTraitSet, newUnionInputs, true) + call.transformTo(newUnionRel) + } +} + +object FlinkJoinUnionTransposeRule { + val LEFT_UNION = new FlinkJoinUnionTransposeRule( + operand(classOf[LogicalJoin], operand(classOf[LogicalUnion], any), + operand(classOf[RelNode], any)), + "JoinUnionTransposeRule(Union-Other)") + + val RIGHT_UNION = new FlinkJoinUnionTransposeRule( + operand(classOf[LogicalJoin], operand(classOf[RelNode], any), + operand(classOf[LogicalUnion], any)), + "JoinUnionTransposeRule(Other-Union)") +} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala deleted file mode 100644 index 01a5130..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.logical - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalAggregate -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} -import scala.collection.JavaConversions._ - -class FlinkAggregateRule - extends ConverterRule( - classOf[LogicalAggregate], - Convention.NONE, - DataSetConvention.INSTANCE, - "FlinkAggregateRule") - { - - def convert(rel: RelNode): RelNode = { - val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - - new DataSetAggregate( - rel.getCluster, - traitSet, - convInput, - agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, - agg.toString, - agg.getGroupSet.toArray) - } - } - -object FlinkAggregateRule { - val INSTANCE: RelOptRule = new FlinkAggregateRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala deleted file mode 100644 index f5e9c68..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.logical - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalCalc -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} - -class FlinkCalcRule - extends ConverterRule( - classOf[LogicalCalc], - Convention.NONE, - DataSetConvention.INSTANCE, - "FlinkCalcRule") - { - - def convert(rel: RelNode): RelNode = { - val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE) - - new DataSetCalc( - rel.getCluster, - traitSet, - convInput, - rel.getRowType, - calc.getProgram, - calc.toString, - description) - } - } - -object FlinkCalcRule { - val INSTANCE: RelOptRule = new FlinkCalcRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala deleted file mode 100644 index c8ce944..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.logical - -import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.calcite.rex.{RexInputRef, RexCall} -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention} -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ - -class FlinkJoinRule - extends ConverterRule( - classOf[LogicalJoin], - Convention.NONE, - DataSetConvention.INSTANCE, - "FlinkJoinRule") - { - - override def matches(call: RelOptRuleCall): Boolean = { - - val join = call.rel(0).asInstanceOf[LogicalJoin] - val children = join.getInputs - val rexBuilder = call.builder().getRexBuilder - - val joinInfo = join.analyzeCondition() - val joinCondition = join.getCondition - val equiCondition = - joinInfo.getEquiCondition(children.get(0), children.get(1), rexBuilder) - - // joins require at least one equi-condition - if (equiCondition.isAlwaysTrue) { - false - } - else { - // check that all equality predicates refer to field refs only (not computed expressions) - // Note: Calcite treats equality predicates on expressions as non-equi predicates - joinCondition match { - - // conjunction of join predicates - case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.AND) => - - c.getOperands.asScala - // look at equality predicates only - .filter { o => - o.isInstanceOf[RexCall] && - o.asInstanceOf[RexCall].getOperator.equals(SqlStdOperatorTable.EQUALS) - } - // check that both children are field references - .map { o => - o.asInstanceOf[RexCall].getOperands.get(0).isInstanceOf[RexInputRef] && - o.asInstanceOf[RexCall].getOperands.get(1).isInstanceOf[RexInputRef] - } - // any equality predicate that does not refer to a field reference? - .reduce( (a, b) => a && b) - - // single equi-join predicate - case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.EQUALS) => - c.getOperands.get(0).isInstanceOf[RexInputRef] && - c.getOperands.get(1).isInstanceOf[RexInputRef] - case _ => - false - } - } - - } - - def convert(rel: RelNode): RelNode = { - - val join: LogicalJoin = rel.asInstanceOf[LogicalJoin] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) - val joinInfo = join.analyzeCondition - - new DataSetJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - join.toString, - join.getCondition, - join.getRowType, - joinInfo, - joinInfo.pairs.toList, - JoinType.INNER, - null, - description) - } - } - -object FlinkJoinRule { - val INSTANCE: RelOptRule = new FlinkJoinRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala deleted file mode 100644 index ff3ee8f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.logical - -import org.apache.calcite.plan.RelOptRule.{any, operand, convert => convertTrait} -import org.apache.calcite.plan.RelOptRule -import org.apache.calcite.plan.RelOptRuleOperand -import org.apache.calcite.plan.RelOptRuleCall -import org.apache.calcite.rel.RelNode -import java.util.ArrayList -import scala.collection.JavaConversions._ -import org.apache.calcite.rel.logical.LogicalJoin -import org.apache.calcite.rel.logical.LogicalUnion -import org.apache.calcite.rel.core.Join -import org.apache.calcite.rel.core.Union - -/** - * This rule is a copy of Calcite's JoinUnionTransposeRule. - * Calcite's implementation checks whether one of the operands is a LogicalUnion, - * which fails in our case, when it matches with a DataSetUnion. - * This rule changes this check to match Union, instead of LogicalUnion only. - * The rest of the rule's logic has not been changed. - */ -class FlinkJoinUnionTransposeRule( - operand: RelOptRuleOperand, - description: String) extends RelOptRule(operand, description) { - - override def onMatch(call: RelOptRuleCall): Unit = { - val join = call.rel(0).asInstanceOf[Join] - val (unionRel: Union, otherInput: RelNode, unionOnLeft: Boolean) = { - if (call.rel(1).isInstanceOf[Union]) { - (call.rel(1).asInstanceOf[Union], call.rel(2).asInstanceOf[RelNode], true) - } - else { - (call.rel(2).asInstanceOf[Union], call.rel(1).asInstanceOf[RelNode], false) - } - } - - if (!unionRel.all) { - return - } - if (!join.getVariablesStopped.isEmpty) { - return - } - // The UNION ALL cannot be on the null generating side - // of an outer join (otherwise we might generate incorrect - // rows for the other side for join keys which lack a match - // in one or both branches of the union) - if (unionOnLeft) { - if (join.getJoinType.generatesNullsOnLeft) { - return - } - } - else { - if (join.getJoinType.generatesNullsOnRight) { - return - } - } - val newUnionInputs = new ArrayList[RelNode] - for (input <- unionRel.getInputs) { - val (joinLeft: RelNode, joinRight: RelNode) = { - if (unionOnLeft) { - (input, otherInput) - } - else { - (otherInput, input) - } - } - - newUnionInputs.add( - join.copy( - join.getTraitSet, - join.getCondition, - joinLeft, - joinRight, - join.getJoinType, - join.isSemiJoinDone)) - } - val newUnionRel = unionRel.copy(unionRel.getTraitSet, newUnionInputs, true) - call.transformTo(newUnionRel) - } -} - -object FlinkJoinUnionTransposeRule { - val LEFT_UNION = new FlinkJoinUnionTransposeRule( - operand(classOf[LogicalJoin], operand(classOf[LogicalUnion], any), - operand(classOf[RelNode], any)), - "JoinUnionTransposeRule(Union-Other)") - - val RIGHT_UNION = new FlinkJoinUnionTransposeRule( - operand(classOf[LogicalJoin], operand(classOf[RelNode], any), - operand(classOf[LogicalUnion], any)), - "JoinUnionTransposeRule(Other-Union)") -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala deleted file mode 100644 index 21da504..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.logical - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} -import org.apache.flink.api.table.plan.schema.DataSetTable - -class FlinkScanRule - extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataSetConvention.INSTANCE, - "FlinkScanRule") - { - def convert(rel: RelNode): RelNode = { - val scan: TableScan = rel.asInstanceOf[TableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - - new DataSetSource( - rel.getCluster, - traitSet, - scan.getTable, - rel.getRowType - ) - } - } - -object FlinkScanRule { - val INSTANCE: RelOptRule = new FlinkScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala deleted file mode 100644 index 11600a2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.logical - -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalUnion -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} - -class FlinkUnionRule - extends ConverterRule( - classOf[LogicalUnion], - Convention.NONE, - DataSetConvention.INSTANCE, - "FlinkUnionRule") - { - - def convert(rel: RelNode): RelNode = { - - val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) - - new DataSetUnion( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - union.toString) - } - } - -object FlinkUnionRule { - val INSTANCE: RelOptRule = new FlinkUnionRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala index 11857df..70d0497 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala @@ -26,7 +26,6 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName} import org.apache.calcite.sql.fun._ import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.plan.{TypeConverter, PlanGenException} import org.apache.flink.api.table.plan.TypeConverter._ import org.apache.flink.api.table.typeinfo.RowTypeInfo @@ -66,7 +65,7 @@ object AggregateUtil { def createOperatorFunctionsForAggregates(namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, outputType: RelDataType, groupings: Array[Int], - config: TableConfig): AggregateResult = { + config: TableConfig): (MapFunction[Any, Row], GroupReduceFunction[Row, Row] ) = { val aggregateFunctionsAndFieldIndexes = transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length) @@ -85,7 +84,7 @@ object AggregateUtil { val mapFunction = new AggregateMapFunction[Row, Row]( aggregates, aggFieldIndexes, groupings, - mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Any]] + mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]] // the mapping relation between field index of intermediate aggregate Row and output Row. val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) @@ -114,7 +113,7 @@ object AggregateUtil { aggOffsetMapping, intermediateRowArity) } - new AggregateResult(mapFunction, reduceGroupFunction) + (mapFunction, reduceGroupFunction) } private def transformToAggregateFunctions( @@ -317,7 +316,3 @@ object AggregateUtil { } } -case class AggregateResult( - val mapFunc: MapFunction[Any, Any], - val reduceGroupFunc: GroupReduceFunction[Row, Row]) { -} http://git-wip-us.apache.org/repos/asf/flink/blob/d720b002/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig deleted file mode 100644 index ad6a641..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala.orig +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table.test - -import org.apache.flink.api.table.Row -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test(expected = classOf[IllegalArgumentException]) - def testGroupingOnNonExistentField(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - // must fail. '_foo not a valid field - .groupBy('_foo) - .select('a.avg) - } - - @Test(expected = classOf[IllegalArgumentException]) - def testGroupingInvalidSelection(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('a, 'b) - // must fail. 'c is not a grouping key or aggregation - .select('c) - } - - @Test - def testGroupedAggregate(): Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('b, 'a.sum) - - val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupingKeyForwardIfNotUsed(): Unit = { - - // the grouping key needs to be forwarded to the intermediate DataSet, even - // if we don't want the key in the output - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .groupBy('b) - .select('a.sum) - - val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupNoAggregation(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env) - .as('a, 'b, 'c) - .groupBy('b) - .select('a.sum as 'd, 'b) - .groupBy('b, 'd) - .select('b) - - val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregateWithLongKeys(): Unit = { - // This uses very long keys to force serialized comparison. - // With short keys, the normalized key is sufficient. - val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements( - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2)) - .rebalance().setParallelism(2).as('a, 'b, 'c) - .groupBy('a, 'b) - .select('c.sum) - - val expected = "10\n" + "8\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedByExpression(): Unit = { - - // verify AggregateProjectPullUpConstantsRule - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .select('a, 4 as 'four, 'b) - .groupBy('four, 'a) - .select('four, 'b.sum) - - val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" + - "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" + - "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n" - val results = t.toDataSet[Row].collect() - - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedByExpression2(): Unit = { - - // verify AggregateProjectPullUpConstantsRule - - val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) - .select('b, 4 as 'four, 'a) - .groupBy('b, 'four) - .select('four, 'a.sum) - - val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n" - val results = t.toDataSet[Row].collect() - - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -}
