[FLINK-5567] [table] Introduce and migrate current table statistics to FlinkStatistic.
This closes #3197. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae0fbff7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae0fbff7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae0fbff7 Branch: refs/heads/master Commit: ae0fbff76f327c008bdbf02cac0067bab507a04f Parents: d6a97e4 Author: æ§¿ç <[email protected]> Authored: Tue Jan 24 14:57:08 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Wed Feb 15 23:53:48 2017 +0100 ---------------------------------------------------------------------- .../flink/table/plan/schema/DataSetTable.scala | 29 +------ .../table/plan/schema/DataStreamTable.scala | 6 +- .../flink/table/plan/schema/FlinkTable.scala | 14 +++- .../table/plan/schema/TableSourceTable.scala | 8 +- .../flink/table/plan/stats/FlinkStatistic.scala | 87 ++++++++++++++++++++ 5 files changed, 113 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala index f8c6835..0ce2a87 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala @@ -18,34 +18,13 @@ package org.apache.flink.table.plan.schema -import java.lang.Double -import java.util -import java.util.Collections - -import org.apache.calcite.rel.{RelCollation, RelDistribution} -import org.apache.calcite.schema.Statistic -import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.api.java.DataSet +import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats} class DataSetTable[T]( val dataSet: DataSet[T], override val fieldIndexes: Array[Int], - override val fieldNames: Array[String]) - extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) { - - override def getStatistic: Statistic = { - new DefaultDataSetStatistic - } - -} - -class DefaultDataSetStatistic extends Statistic { - - override def getRowCount: Double = 1000d - - override def getCollations: util.List[RelCollation] = Collections.emptyList() - - override def isKey(columns: ImmutableBitSet): Boolean = false - - override def getDistribution: RelDistribution = null + override val fieldNames: Array[String], + override val statistic: FlinkStatistic = FlinkStatistic.of(TableStats(1000L))) + extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic) { } http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala index 0355fac..6ce6570 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala @@ -19,10 +19,12 @@ package org.apache.flink.table.plan.schema import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.plan.stats.FlinkStatistic class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], - override val fieldNames: Array[String]) - extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { + override val fieldNames: Array[String], + override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) { } http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index 971f54f..ea77061 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -19,16 +19,19 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic import org.apache.calcite.schema.impl.AbstractTable import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.stats.FlinkStatistic abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], val fieldIndexes: Array[Int], - val fieldNames: Array[String]) + val fieldNames: Array[String], + val statistic: FlinkStatistic) extends AbstractTable { if (fieldIndexes.length != fieldNames.length) { @@ -64,4 +67,11 @@ abstract class FlinkTable[T]( flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes) } + /** + * Returns statistics of current table + * + * @return statistics of current table + */ + override def getStatistic: Statistic = statistic + } http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 4f82f5e..a3851e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -19,11 +19,15 @@ package org.apache.flink.table.plan.schema import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ -class TableSourceTable[T](val tableSource: TableSource[T]) +class TableSourceTable[T]( + val tableSource: TableSource[T], + override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) extends FlinkTable[T]( typeInfo = tableSource.getReturnType, fieldIndexes = TableEnvironment.getFieldIndices(tableSource), - fieldNames = TableEnvironment.getFieldNames(tableSource)) + fieldNames = TableEnvironment.getFieldNames(tableSource), + statistic) http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala new file mode 100644 index 0000000..6f4ea00 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats + +import java.lang.Double +import java.util.{Collections, List} + +import org.apache.calcite.rel.{RelCollation, RelDistribution} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.util.ImmutableBitSet + +/** + * The class provides statistics for a [[org.apache.flink.table.plan.schema.FlinkTable]]. + * + * @param tableStats The table statistics. + */ +class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic { + + /** + * Returns the table statistics. + * + * @return The table statistics + */ + def getTableStats: TableStats = tableStats.getOrElse(null) + + /** + * Returns the stats of the specified the column. + * + * @param columnName The name of the column for which the stats are requested. + * @return The stats of the specified column. + */ + def getColumnStats(columnName: String): ColumnStats = tableStats match { + case Some(tStats) => tStats.colStats.get(columnName) + case None => null + } + + /** + * Returns the number of rows of the table. + * + * @return The number of rows of the table. + */ + override def getRowCount: Double = tableStats match { + case Some(tStats) => tStats.rowCount.toDouble + case None => null + } + + override def getCollations: List[RelCollation] = Collections.emptyList() + + override def isKey(columns: ImmutableBitSet): Boolean = false + + override def getDistribution: RelDistribution = null + +} + +/** + * Methods to create FlinkStatistic. + */ +object FlinkStatistic { + + /** Represents a FlinkStatistic that knows nothing about a table */ + val UNKNOWN: FlinkStatistic = new FlinkStatistic(None) + + /** + * Returns a FlinkStatistic with given table statistics. + * + * @param tableStats The table statistics. + * @return The generated FlinkStatistic + */ + def of(tableStats: TableStats): FlinkStatistic = new FlinkStatistic(Some(tableStats)) + +}
