[FLINK-3225] Enforce translation to DataSetNodes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cb76fcb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cb76fcb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cb76fcb Branch: refs/heads/tableOnCalcite Commit: 3cb76fcbcc997e975ac7f1589c9f65d83dfd0137 Parents: fe5e406 Author: Fabian Hueske <[email protected]> Authored: Mon Feb 1 23:45:16 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Feb 12 11:34:09 2016 +0100 ---------------------------------------------------------------------- .../flink/api/java/table/JavaBatchTranslator.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3cb76fcb/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 66bfbe7..7e91190 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -19,14 +19,14 @@ package org.apache.flink.api.java.table import org.apache.calcite.plan.{RelTraitSet, RelOptUtil} -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelCollations, RelNode} import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.table.plan._ import org.apache.flink.api.table.Table -import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.DataSetTable @@ -61,20 +61,19 @@ class JavaBatchTranslator extends PlanTranslator { // get the planner for the plan val planner = lPlan.getCluster.getPlanner - // we do not have any special requirements for the output - val outputProps = RelTraitSet.createEmpty() println("-----------") println("Input Plan:") println("-----------") println(RelOptUtil.toString(lPlan)) - + // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(lPlan) // optimize the logical Flink plan val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) - val optPlan = optProgram.run(planner, decorPlan, outputProps) + val flinkOutputProps = RelTraitSet.createEmpty() + val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps) println("---------------") println("Optimized Plan:") @@ -83,7 +82,10 @@ class JavaBatchTranslator extends PlanTranslator { // optimize the logical Flink plan val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES) - val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps) + val dataSetOutputProps = RelTraitSet.createEmpty() + .plus(DataSetConvention.INSTANCE) + .plus(RelCollations.of()).simplify() + val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps) println("-------------") println("DataSet Plan:")
