Repository: flink
Updated Branches:
  refs/heads/master 55d60615a -> 41d5875bf


[FLINK-5226] [table] Use correct DataSetCostFactory and improve DataSetCalc 
costs.

- Improved DataSetCalc costs make projections cheap and help to push them down.

This closes #2926.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677d0d90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677d0d90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677d0d90

Branch: refs/heads/master
Commit: 677d0d9073952b6f4c745ac242ba4108364f2189
Parents: 55d6061
Author: Fabian Hueske <[email protected]>
Authored: Fri Dec 2 15:28:16 2016 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Thu Dec 8 18:45:42 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/FlinkRelBuilder.scala       |  2 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 14 ++++++--
 .../table/plan/nodes/dataset/DataSetJoin.scala  | 17 ++++++----
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 14 ++++++--
 .../api/scala/batch/sql/SingleRowJoinTest.scala | 34 ++++++++++++--------
 5 files changed, 55 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index da44ebb..8508e53 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -79,7 +79,7 @@ object FlinkRelBuilder {
     val typeFactory = new FlinkTypeFactory(typeSystem)
 
     // create context instances with Flink type factory
-    val planner = new VolcanoPlanner(Contexts.empty())
+    val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
     planner.setExecutor(config.getExecutor)
     planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
     val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index b8b74ad..c0881b7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -32,6 +32,8 @@ import TypeConverter._
 import org.apache.flink.api.table.BatchTableEnvironment
 import org.apache.calcite.rex._
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -73,8 +75,16 @@ class DataSetCalc(
 
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
-    val exprCnt = calcProgram.getExprCount
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
+
+    // compute number of expressions that do not access a field or literal, 
i.e. computations,
+    //   conditions, etc. We only want to account for computations, not for 
simple projections.
+    val compCnt = calcProgram.getExprList.asScala.toList.count {
+      case i: RexInputRef => false
+      case l: RexLiteral => false
+      case _ => true
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
   }
 
   override def estimateRowCount(metadata: RelMetadataQuery): Double = {

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 6d7a30e..ccd84ca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -88,12 +88,17 @@ class DataSetJoin(
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 
-    val children = this.getInputs
-    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, 
child) =>
-      val rowCnt = metadata.getRowCount(child)
-      val rowSize = this.estimateRowSize(child.getRowType)
-      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
-    }
+    val leftRowCnt = metadata.getRowCount(getLeft)
+    val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+    val rightRowCnt = metadata.getRowCount(getRight)
+    val rightRowSize = estimateRowSize(getRight.getRowType)
+
+    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+    val cpuCost = leftRowCnt + rightRowCnt
+    val rowCnt = leftRowCnt + rightRowCnt
+
+    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
   }
 
   override def translateToPlan(

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
index 7b2b497..d0c0400 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -43,15 +43,23 @@ class SetOperatorsTest extends TableTestBase {
             "DataSetCalc",
             binaryNode(
               "DataSetJoin",
-              batchTableNode(1),
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "b_long")
+              ),
               unaryNode(
                 "DataSetAggregate",
-                batchTableNode(0),
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a_long")
+                ),
                 term("groupBy", "a_long"),
                 term("select", "a_long")
               ),
               term("where", "=(a_long, b_long)"),
-              term("join", "b_long", "b_int", "b_string", "a_long"),
+              term("join", "b_long", "a_long"),
               term("joinType", "InnerJoin")
             ),
             term("select", "true AS $f0", "a_long")

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
index f56b9ae..49f61af 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
@@ -48,20 +48,23 @@ class SingleRowJoinTest extends TableTestBase {
               "DataSetUnion",
               unaryNode(
                 "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null)),
-                term("values", "a1", "a2")
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
               ),
-              term("union","a1","a2")
+              term("union","a1")
             ),
             term("select", "COUNT(a1) AS cnt")
           ),
-          term("where", "true"),
+          term("where", "=(CAST(a1), cnt)"),
           term("join", "a1", "a2", "cnt"),
           term("joinType", "NestedLoopJoin")
         ),
-        term("select", "a1", "a2"),
-        term("where", "=(CAST(a1), cnt)")
+        term("select", "a1", "a2")
       )
 
     util.verifySql(query, expected)
@@ -89,20 +92,23 @@ class SingleRowJoinTest extends TableTestBase {
               "DataSetUnion",
               unaryNode(
                 "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null)),
-                term("values", "a1", "a2")
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
               ),
-              term("union","a1","a2")
+              term("union", "a1")
             ),
             term("select", "COUNT(a1) AS cnt")
           ),
-          term("where", "true"),
+          term("where", "<(a1, cnt)"),
           term("join", "a1", "a2", "cnt"),
           term("joinType", "NestedLoopJoin")
         ),
-        term("select", "a1", "a2"),
-        term("where", "<(a1, cnt)")
+        term("select", "a1", "a2")
       )
 
     util.verifySql(query, expected)

Reply via email to