Repository: flink Updated Branches: refs/heads/master a062f4b8e -> b1913a4af
[FLINK-5394] [table] Fix row count estimation This closes #3058. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1913a4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1913a4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1913a4a Branch: refs/heads/master Commit: b1913a4af5ff5794629e66e2f4b0d26054d3fc29 Parents: a062f4b Author: beyond1920 <beyond1...@126.com> Authored: Thu Dec 29 15:52:17 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Tue Jan 17 12:53:25 2017 +0100 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkRelBuilder.scala | 7 ++++ .../cost/FlinkDefaultRelMetadataProvider.scala | 32 ++++++++++++++++++ .../table/plan/cost/FlinkRelMdRowCount.scala | 35 ++++++++++++++++++++ .../table/plan/nodes/dataset/DataSetSort.scala | 16 +++++++++ 4 files changed, 90 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala index 8465ec6..aabf4c9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -27,11 +27,13 @@ import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan._ import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery} import org.apache.calcite.rex.RexBuilder import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} import org.apache.calcite.tools.{FrameworkConfig, RelBuilder} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions.WindowProperty +import org.apache.flink.table.plan.cost.FlinkDefaultRelMetadataProvider import org.apache.flink.table.plan.logical.LogicalWindow import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate @@ -83,6 +85,11 @@ object FlinkRelBuilder { planner.setExecutor(config.getExecutor) planner.addRelTraitDef(ConventionTraitDef.INSTANCE) val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) + cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE) + // just set metadataProvider is not enough, see + // https://www.mail-archive.com/dev@calcite.apache.org/msg00930.html + RelMetadataQuery.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(cluster.getMetadataProvider)) val calciteSchema = CalciteSchema.from(config.getDefaultSchema) val relOptSchema = new CalciteCatalogReader( calciteSchema, http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala new file mode 100644 index 0000000..0f2b7d5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkDefaultRelMetadataProvider.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.cost + +import com.google.common.collect.ImmutableList +import org.apache.calcite.rel.metadata.{ChainedRelMetadataProvider, DefaultRelMetadataProvider, RelMetadataProvider} + +object FlinkDefaultRelMetadataProvider { + + val INSTANCE: RelMetadataProvider = ChainedRelMetadataProvider.of( + ImmutableList.of( + FlinkRelMdRowCount.SOURCE, + DefaultRelMetadataProvider.INSTANCE + ) + ) +} http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala new file mode 100644 index 0000000..e6ea099 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.cost + +import org.apache.calcite.rel.metadata.{ReflectiveRelMetadataProvider, RelMdRowCount, RelMetadataProvider, RelMetadataQuery} +import org.apache.calcite.util.BuiltInMethod +import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetSort} +import java.lang.Double + +object FlinkRelMdRowCount extends RelMdRowCount { + + val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.ROW_COUNT.method, + this) + + def getRowCount(rel: DataSetCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq) + + def getRowCount(rel: DataSetSort, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq) +} http://git-wip-us.apache.org/repos/asf/flink/blob/b1913a4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala index 428ea84..4d84730 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala @@ -23,6 +23,7 @@ import java.util import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.flink.api.common.operators.Order @@ -71,6 +72,21 @@ class DataSetSort( ) } + override def estimateRowCount(metadata: RelMetadataQuery): Double = { + val inputRowCnt = metadata.getRowCount(this.getInput) + if (inputRowCnt == null) { + inputRowCnt + } else { + val rowCount = (inputRowCnt - limitStart).max(1.0) + if (fetch != null) { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + } else { + rowCount + } + } + } + override def translateToPlan( tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]] = None)