Repository: flink Updated Branches: refs/heads/master 55d60615a -> 41d5875bf
[FLINK-5226] [table] Use correct DataSetCostFactory and improve DataSetCalc costs. - Improved DataSetCalc costs make projections cheap and help to push them down. This closes #2926. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677d0d90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677d0d90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677d0d90 Branch: refs/heads/master Commit: 677d0d9073952b6f4c745ac242ba4108364f2189 Parents: 55d6061 Author: Fabian Hueske <[email protected]> Authored: Fri Dec 2 15:28:16 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Thu Dec 8 18:45:42 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/FlinkRelBuilder.scala | 2 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 14 ++++++-- .../table/plan/nodes/dataset/DataSetJoin.scala | 17 ++++++---- .../api/scala/batch/sql/SetOperatorsTest.scala | 14 ++++++-- .../api/scala/batch/sql/SingleRowJoinTest.scala | 34 ++++++++++++-------- 5 files changed, 55 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala index da44ebb..8508e53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala @@ -79,7 +79,7 @@ object FlinkRelBuilder { val typeFactory = new FlinkTypeFactory(typeSystem) // create context instances with Flink type factory - val planner = new VolcanoPlanner(Contexts.empty()) + val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty()) planner.setExecutor(config.getExecutor) planner.addRelTraitDef(ConventionTraitDef.INSTANCE) val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala index b8b74ad..c0881b7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala @@ -32,6 +32,8 @@ import TypeConverter._ import org.apache.flink.api.table.BatchTableEnvironment import org.apache.calcite.rex._ +import scala.collection.JavaConverters._ + /** * Flink RelNode which matches along with LogicalCalc. * @@ -73,8 +75,16 @@ class DataSetCalc( val child = this.getInput val rowCnt = metadata.getRowCount(child) - val exprCnt = calcProgram.getExprCount - planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0) + + // compute number of expressions that do not access a field or literal, i.e. computations, + // conditions, etc. We only want to account for computations, not for simple projections. + val compCnt = calcProgram.getExprList.asScala.toList.count { + case i: RexInputRef => false + case l: RexLiteral => false + case _ => true + } + + planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0) } override def estimateRowCount(metadata: RelMetadataQuery): Double = { http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index 6d7a30e..ccd84ca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -88,12 +88,17 @@ class DataSetJoin( override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { - val children = this.getInputs - children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) => - val rowCnt = metadata.getRowCount(child) - val rowSize = this.estimateRowSize(child.getRowType) - cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) - } + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) } override def translateToPlan( http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala index 7b2b497..d0c0400 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala @@ -43,15 +43,23 @@ class SetOperatorsTest extends TableTestBase { "DataSetCalc", binaryNode( "DataSetJoin", - batchTableNode(1), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "b_long") + ), unaryNode( "DataSetAggregate", - batchTableNode(0), + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a_long") + ), term("groupBy", "a_long"), term("select", "a_long") ), term("where", "=(a_long, b_long)"), - term("join", "b_long", "b_int", "b_string", "a_long"), + term("join", "b_long", "a_long"), term("joinType", "InnerJoin") ), term("select", "true AS $f0", "a_long") http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala index f56b9ae..49f61af 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala @@ -48,20 +48,23 @@ class SingleRowJoinTest extends TableTestBase { "DataSetUnion", unaryNode( "DataSetValues", - batchTableNode(0), - tuples(List(null, null)), - term("values", "a1", "a2") + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a1") + ), + tuples(List(null)), + term("values", "a1") ), - term("union","a1","a2") + term("union","a1") ), term("select", "COUNT(a1) AS cnt") ), - term("where", "true"), + term("where", "=(CAST(a1), cnt)"), term("join", "a1", "a2", "cnt"), term("joinType", "NestedLoopJoin") ), - term("select", "a1", "a2"), - term("where", "=(CAST(a1), cnt)") + term("select", "a1", "a2") ) util.verifySql(query, expected) @@ -89,20 +92,23 @@ class SingleRowJoinTest extends TableTestBase { "DataSetUnion", unaryNode( "DataSetValues", - batchTableNode(0), - tuples(List(null, null)), - term("values", "a1", "a2") + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a1") + ), + tuples(List(null)), + term("values", "a1") ), - term("union","a1","a2") + term("union", "a1") ), term("select", "COUNT(a1) AS cnt") ), - term("where", "true"), + term("where", "<(a1, cnt)"), term("join", "a1", "a2", "cnt"), term("joinType", "NestedLoopJoin") ), - term("select", "a1", "a2"), - term("where", "<(a1, cnt)") + term("select", "a1", "a2") ) util.verifySql(query, expected)
