This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 147022cf7984b8e56d1e01e826d9640674837e68 Author: Jark Wu <[email protected]> AuthorDate: Mon Jun 10 15:13:47 2019 +0800 [FLINK-12708][table] Expose getTableStats() interface in TableSource --- .../apache/flink/table/sources/TableSource.java | 10 +++ .../apache/flink/table/api/TableEnvironment.scala | 16 ++++- .../flink/table/api/batch/TableStatsTest.xml | 69 +++++++++++++++++++ .../flink/table/api/batch/TableStatsTest.scala | 80 ++++++++++++++++++++++ .../apache/flink/table/util/TableTestBase.scala | 12 +++- .../flink/table/catalog/DatabaseCalciteSchema.java | 2 +- .../flink/table/plan/QueryOperationConverter.java | 2 +- .../flink/table/plan/stats/FlinkStatistic.scala | 2 +- 8 files changed, 185 insertions(+), 8 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java index 386240b..42326ae 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSource.java @@ -23,9 +23,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableConnectorUtils; +import java.util.Optional; + import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** @@ -84,4 +87,11 @@ public interface TableSource<T> { default String explainSource() { return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()); } + + /** + * Returns the (optional) statistics for this {@link TableSource}. + */ + default Optional<TableStats> getTableStats() { + return Optional.empty(); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 3dbf25c..b603381 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -745,7 +745,13 @@ abstract class TableEnvironment( */ def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { checkValidTableName(name) - registerTableSourceInternal(name, tableSource, FlinkStatistic.UNKNOWN, replace = false) + registerTableSourceInternal( + name, + tableSource, + FlinkStatistic.builder() + .tableStats(tableSource.getTableStats.orElse(null)) + .build(), + replace = false) } /** @@ -758,7 +764,13 @@ abstract class TableEnvironment( def registerOrReplaceTableSource(name: String, tableSource: TableSource[_]): Unit = { checkValidTableName(name) - registerTableSourceInternal(name, tableSource, FlinkStatistic.UNKNOWN, replace = true) + registerTableSourceInternal( + name, + tableSource, + FlinkStatistic.builder() + .tableStats(tableSource.getTableStats.orElse(null)) + .build(), + replace = true) } /** diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/TableStatsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/TableStatsTest.xml new file mode 100644 index 0000000..2026490 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/TableStatsTest.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testTableStatsWithSmallRightTable"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM x, y WHERE a = d AND c LIKE 'He%' + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) ++- LogicalFilter(condition=[AND(=($0, $3), LIKE($2, _UTF-16LE'He%'))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]]) + +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], isBroadcast=[true], build=[right]) +:- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')]) +: +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +]]> + </Resource> + </TestCase> + <TestCase name="testTableStatsWithSmallLeftTable"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM x, y WHERE a = d AND c LIKE 'He%' + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) ++- LogicalFilter(condition=[AND(=($0, $3), LIKE($2, _UTF-16LE'He%'))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]]) + +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], isBroadcast=[true], build=[left]) +:- Exchange(distribution=[broadcast]) +: +- Calc(select=[a, b, c], where=[LIKE(c, _UTF-16LE'He%')]) +: +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- TableSourceScan(table=[[y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/TableStatsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/TableStatsTest.scala new file mode 100644 index 0000000..2e3c5db --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/batch/TableStatsTest.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.batch + +import org.apache.flink.table.api.{DataTypes, PlannerConfigOptions, TableConfigOptions, TableSchema} +import org.apache.flink.table.plan.stats.TableStats +import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.util.{TableTestBase, TestTableSource} +import org.junit.Test + +class TableStatsTest extends TableTestBase { + + @Test + def testTableStatsWithSmallRightTable(): Unit = { + val util = batchTestUtil() + util.tableEnv.registerTableSource("x", createTableX(new TableStats(10000000L))) + util.tableEnv.registerTableSource("y", createTableY(new TableStats(100L))) + util.tableEnv.getConfig.getConf.setString( + TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin") + util.tableEnv.getConfig.getConf.setLong( + PlannerConfigOptions.SQL_OPTIMIZER_HASH_JOIN_BROADCAST_THRESHOLD, 5000) + val sqlQuery = + """ + |SELECT * FROM x, y WHERE a = d AND c LIKE 'He%' + """.stripMargin + // right table should be broadcast + util.verifyPlan(sqlQuery) + } + + @Test + def testTableStatsWithSmallLeftTable(): Unit = { + val util = batchTestUtil() + util.tableEnv.registerTableSource("x", createTableX(new TableStats(100L))) + util.tableEnv.registerTableSource("y", createTableY(new TableStats(10000000L))) + util.tableEnv.getConfig.getConf.setString( + TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin") + util.tableEnv.getConfig.getConf.setLong( + PlannerConfigOptions.SQL_OPTIMIZER_HASH_JOIN_BROADCAST_THRESHOLD, 5000) + val sqlQuery = + """ + |SELECT * FROM x, y WHERE a = d AND c LIKE 'He%' + """.stripMargin + // left table should be broadcast + util.verifyPlan(sqlQuery) + } + + private def createTableX(tableStats: TableStats): TableSource[_] = { + val schema = TableSchema.builder() + .field("a", DataTypes.INT()) // 4 + .field("b", DataTypes.BIGINT()) // 8 + .field("c", DataTypes.STRING()) // 12 + .build() + new TestTableSource(true, schema, Option(tableStats)) + } + + private def createTableY(tableStats: TableStats): TableSource[_] = { + val schema = TableSchema.builder() + .field("d", DataTypes.INT()) + .field("e", DataTypes.BIGINT()) + .field("f", DataTypes.STRING()) + .build() + new TestTableSource(true, schema, Option(tableStats)) + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala index 0d92a4d..6342018 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala @@ -33,7 +33,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl import org.apache.flink.table.plan.nodes.exec.ExecNode import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkStreamProgram} import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats} import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil} import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink} import org.apache.flink.table.sinks._ @@ -47,13 +47,14 @@ import org.apache.flink.types.Row import org.apache.calcite.rel.RelNode import org.apache.calcite.sql.SqlExplainLevel - import org.apache.commons.lang3.SystemUtils import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Rule import org.junit.rules.{ExpectedException, TestName} +import _root_.java.util.Optional + import _root_.scala.collection.JavaConversions._ /** @@ -644,7 +645,7 @@ case class BatchTableTestUtil(test: TableTestBase) extends TableTestUtil(test) { /** * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing. */ -class TestTableSource(isBatch: Boolean, schema: TableSchema) +class TestTableSource(isBatch: Boolean, schema: TableSchema, tableStats: Option[TableStats] = None) extends StreamTableSource[BaseRow] { override def isBounded: Boolean = isBatch @@ -661,4 +662,9 @@ class TestTableSource(isBatch: Boolean, schema: TableSchema) } override def getTableSchema: TableSchema = schema + + /** Returns the statistics of the table, returns null if don't know the statistics. */ + override def getTableStats: Optional[TableStats] = { + Optional.ofNullable(tableStats.orNull) + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java index 14ef5aa..356852f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java @@ -75,7 +75,7 @@ class DatabaseCalciteSchema implements Schema { .map(tableSource -> new TableSourceTable<>( tableSource, !connectorTable.isBatch(), - FlinkStatistic.UNKNOWN())) + FlinkStatistic.of(tableSource.getTableStats().orElse(null)))) .orElseThrow(() -> new TableException("Cannot query a sink only table.")); } else { throw new TableException("Unsupported table type: " + table); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java index 3c3eff2..5bc8206 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java @@ -278,7 +278,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod final Table relTable = new TableSourceTable<>( tableSourceTable.getTableSource(), !tableSourceTable.isBatch(), - FlinkStatistic.UNKNOWN()); + FlinkStatistic.of(tableSourceTable.getTableSource().getTableStats().orElse(null))); CatalogReader catalogReader = (CatalogReader) relBuilder.getRelOptSchema(); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala index 1acc91f..754509e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala @@ -86,6 +86,6 @@ object FlinkStatistic { * @param tableStats The table statistics. * @return The generated FlinkStatistic */ - def of(tableStats: TableStats): FlinkStatistic = new FlinkStatistic(Some(tableStats)) + def of(tableStats: TableStats): FlinkStatistic = new FlinkStatistic(Option(tableStats)) }
