Repository: flink
Updated Branches:
  refs/heads/master a062f4b8e -> b1913a4af


[FLINK-5394] [table] Fix row count estimation

This closes #3058.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1913a4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1913a4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1913a4a

Branch: refs/heads/master
Commit: b1913a4af5ff5794629e66e2f4b0d26054d3fc29
Parents: a062f4b
Author: beyond1920 <beyond1...@126.com>
Authored: Thu Dec 29 15:52:17 2016 +0800
Committer: twalthr <twal...@apache.org>
Committed: Tue Jan 17 12:53:25 2017 +0100

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkRelBuilder.scala   |  7 ++++
 .../cost/FlinkDefaultRelMetadataProvider.scala  | 32 ++++++++++++++++++
 .../table/plan/cost/FlinkRelMdRowCount.scala    | 35 ++++++++++++++++++++
 .../table/plan/nodes/dataset/DataSetSort.scala  | 16 +++++++++
 4 files changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 8465ec6..aabf4c9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -27,11 +27,13 @@ import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, 
RelMetadataQuery}
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
 import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions.WindowProperty
+import org.apache.flink.table.plan.cost.FlinkDefaultRelMetadataProvider
 import org.apache.flink.table.plan.logical.LogicalWindow
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 
@@ -83,6 +85,11 @@ object FlinkRelBuilder {
     planner.setExecutor(config.getExecutor)
     planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
     val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
+    cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE)
+    // just set metadataProvider is not enough, see
+    // https://www.mail-archive.com/dev@calcite.apache.org/msg00930.html
+    RelMetadataQuery.THREAD_PROVIDERS.set(
+      JaninoRelMetadataProvider.of(cluster.getMetadataProvider))
     val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
     val relOptSchema = new CalciteCatalogReader(
       calciteSchema,

http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala
new file mode 100644
index 0000000..0f2b7d5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.cost
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rel.metadata.{ChainedRelMetadataProvider, 
DefaultRelMetadataProvider, RelMetadataProvider}
+
+object FlinkDefaultRelMetadataProvider {
+
+  val INSTANCE: RelMetadataProvider = ChainedRelMetadataProvider.of(
+    ImmutableList.of(
+      FlinkRelMdRowCount.SOURCE,
+      DefaultRelMetadataProvider.INSTANCE
+    )
+  )
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
new file mode 100644
index 0000000..e6ea099
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.cost
+
+import org.apache.calcite.rel.metadata.{ReflectiveRelMetadataProvider, 
RelMdRowCount, RelMetadataProvider, RelMetadataQuery}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort}
+import java.lang.Double
+
+object FlinkRelMdRowCount extends RelMdRowCount {
+
+    val SOURCE: RelMetadataProvider = 
ReflectiveRelMetadataProvider.reflectiveSource(
+      BuiltInMethod.ROW_COUNT.method,
+      this)
+
+    def getRowCount(rel: DataSetCalc, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
+
+    def getRowCount(rel: DataSetSort, mq: RelMetadataQuery): Double = 
rel.estimateRowCount(mq)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 428ea84..4d84730 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -23,6 +23,7 @@ import java.util
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
@@ -71,6 +72,21 @@ class DataSetSort(
     )
   }
 
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+    val inputRowCnt = metadata.getRowCount(this.getInput)
+    if (inputRowCnt == null) {
+      inputRowCnt
+    } else {
+      val rowCount = (inputRowCnt - limitStart).max(1.0)
+      if (fetch != null) {
+        val limit = RexLiteral.intValue(fetch)
+        rowCount.min(limit)
+      } else {
+        rowCount
+      }
+    }
+  }
+
   override def translateToPlan(
       tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]] = None)

Reply via email to