This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac972cd78da4ef1b972dbd2b9afc39e774a2f6fe Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 19:27:40 2020 +0800 [FLINK-14338][table-planner][table-planner-blink] Remove usage of TableScanRule and use new TableScanFactory extension * This change was introduced in CALCITE-3769 --- .../catalog/QueryOperationCatalogViewTable.java | 7 ++- .../table/planner/delegation/PlannerContext.java | 40 +++-------------- .../planner/plan/rules/FlinkBatchRuleSets.scala | 4 +- .../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +- .../planner/plan/schema/CatalogSourceTable.scala | 14 +----- .../table/planner/plan/batch/sql/TableScanTest.xml | 50 ++++++++-------------- .../flink/table/plan/rules/FlinkRuleSets.scala | 11 +++-- 7 files changed, 43 insertions(+), 87 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java index 37bacb3..f184be0 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java @@ -25,6 +25,7 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; @@ -68,7 +69,11 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable { @Override public RelNode convertToRel(RelOptTable.ToRelContext context) { - FlinkRelBuilder relBuilder = FlinkRelBuilder.of(context.getCluster(), this.getRelOptSchema()); + FlinkRelBuilder relBuilder = new FlinkRelBuilder( + // Sets up the view expander. + Contexts.of(context, context.getCluster().getPlanner().getContext()), + context.getCluster(), + this.getRelOptSchema()); return relBuilder.queryOperation(catalogView.getQueryOperation()).build(); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 20823ea..23372bf 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -44,7 +44,6 @@ import org.apache.flink.table.planner.codegen.ExpressionReducer; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; import org.apache.flink.table.planner.plan.cost.FlinkCostFactory; -import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; @@ -54,11 +53,8 @@ import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.plan.ViewExpanders; import org.apache.calcite.plan.volcano.VolcanoPlanner; -import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; @@ -159,41 +155,20 @@ public class PlannerContext { * @return configured rel builder */ public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDatabase) { - FlinkCalciteCatalogReader relOptSchema = createCatalogReader(false, currentCatalog, currentDatabase); + FlinkCalciteCatalogReader relOptSchema = createCatalogReader( + false, + currentCatalog, + currentDatabase); - Context chain = Contexts.chain( + Context chain = Contexts.of( context, - // We need to overwrite the default scan factory, which does not - // expand views. The expandingScanFactory uses the FlinkPlanner to translate a view - // into a rel tree, before applying any subsequent rules. - Contexts.of(expandingScanFactory( - createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext())) + // Sets up the ViewExpander explicitly for FlinkRelBuilder. + createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext() ); return new FlinkRelBuilder(chain, cluster, relOptSchema); } /** - * Creates a {@link RelFactories.TableScanFactory} that uses a - * {@link org.apache.calcite.plan.RelOptTable.ViewExpander} to handle - * {@link ExpandingPreparingTable} instances, and falls back to a default - * factory for other tables. - * - * @param viewExpander View expander - * @return Table scan factory - */ - private static RelFactories.TableScanFactory expandingScanFactory( - RelOptTable.ViewExpander viewExpander) { - return (cluster, table) -> { - if (table instanceof ExpandingPreparingTable) { - final RelOptTable.ToRelContext toRelContext = - ViewExpanders.toRelContext(viewExpander, cluster); - return table.toRel(toRelContext); - } - return RelFactories.DEFAULT_TABLE_SCAN_FACTORY.createScan(cluster, table); - }; - } - - /** * Creates a configured {@link FlinkPlannerImpl} for a planning session. * * @param currentCatalog the current default catalog to look for first during planning. @@ -293,7 +268,6 @@ public class PlannerContext { return JavaScalaConversionUtil.toJava(calciteConfig.getSqlToRelConverterConfig()).orElseGet( () -> SqlToRelConverter.configBuilder() .withTrimUnusedFields(false) - .withConvertTableAccess(true) .withInSubQueryThreshold(Integer.MAX_VALUE) .withExpand(false) .withRelBuilderFactory(FlinkRelFactories.FLINK_REL_BUILDER()) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 104c8f3..77b5fc1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -55,8 +55,7 @@ object FlinkBatchRuleSets { */ val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList( LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER, - LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER, - TableScanRule.INSTANCE) + LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER) val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList( EnumerableToLogicalTableScan.INSTANCE) @@ -65,7 +64,6 @@ object FlinkBatchRuleSets { * Convert table references before query decorrelation. */ val TABLE_REF_RULES: RuleSet = RuleSets.ofList( - TableScanRule.INSTANCE, EnumerableToLogicalTableScan.INSTANCE ) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 03e294e..83d6a41 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -56,8 +56,7 @@ object FlinkStreamRuleSets { val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList( LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER, LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER, - LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE, - TableScanRule.INSTANCE) + LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE) val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList( EnumerableToLogicalTableScan.INSTANCE) @@ -66,7 +65,6 @@ object FlinkStreamRuleSets { * Convert table references before query decorrelation. */ val TABLE_REF_RULES: RuleSet = RuleSets.ofList( - TableScanRule.INSTANCE, EnumerableToLogicalTableScan.INSTANCE ) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala index 72764fe..094de36 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala @@ -25,14 +25,13 @@ import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, T import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, FlinkTypeFactory} import org.apache.flink.table.planner.catalog.CatalogSchemaTable import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation} -import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName + import org.apache.calcite.plan.{RelOptSchema, RelOptTable} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.flink.table.types.logical.{TimestampKind, TimestampType} -import java.util import java.util.{List => JList} import scala.collection.JavaConversions._ @@ -64,15 +63,6 @@ class CatalogSourceTable[T]( .toMap } - override def getQualifiedName: JList[String] = { - // Do not explain source, we already have full names, table source should be created in toRel. - val ret = new util.ArrayList[String](names) - // Add class name to distinguish TableSourceTable. - val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames) - ret.add(s"catalog_source: [$name]") - ret - } - override def toRel(context: RelOptTable.ToRelContext): RelNode = { val cluster = context.getCluster val flinkContext = cluster @@ -104,7 +94,7 @@ class CatalogSourceTable[T]( .toArray // Copy this table with physical scan row type. val newRelTable = tableSourceTable.copy(tableSource, physicalFields) - val scan = LogicalTableScan.create(cluster, newRelTable) + val scan = LogicalTableScan.create(cluster, newRelTable, context.getTableHints) val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema) relBuilder.push(scan) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml index 63bea00..1c75dff 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml @@ -19,7 +19,6 @@ limitations under the License. <TestCase name="testDDLTableScan"> <Resource name="sql"> <![CDATA[SELECT * FROM src WHERE a > 1]]> - </Resource> <Resource name="planBefore"> <![CDATA[ @@ -27,80 +26,66 @@ LogicalProject(ts=[$0], a=[$1], b=[$2]) +- LogicalFilter(condition=[>($1, 1)]) +- LogicalTableScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]]) ]]> - </Resource> <Resource name="planAfter"> <![CDATA[ Calc(select=[ts, a, b], where=[>(a, 1)]) +- TableSourceScan(table=[[default_catalog, default_database, src, source: [CollectionTableSource(ts, a, b)]]], fields=[ts, a, b]) ]]> - </Resource> </TestCase> - <TestCase name="testTableSourceScan"> + <TestCase name="testDDLWithComputedColumn"> <Resource name="sql"> - <![CDATA[SELECT * FROM MyTable]]> - + <![CDATA[SELECT * FROM computed_column_t]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(a=[$0], b=[$1], c=[$2]) -+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)]) ++- LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]]) ]]> - </Resource> <Resource name="planAfter"> <![CDATA[ -TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) ++- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b]) ]]> - </Resource> </TestCase> - <TestCase name="testDDLWithWatermarkComputedColumn"> <Resource name="sql"> <![CDATA[SELECT * FROM c_watermark_t]]> - </Resource> <Resource name="planBefore"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)]) +- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]]) ]]> - </Resource> <Resource name="planAfter"> <![CDATA[ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) +- TableSourceScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]], fields=[a, b]) ]]> - </Resource> </TestCase> - <TestCase name="testDDLWithComputedColumn"> - <Resource name="sql"> - <![CDATA[SELECT * FROM computed_column_t]]> - - </Resource> + <TestCase name="testTableApiScanWithComputedColumn"> <Resource name="planBefore"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)]) +- LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]]) ]]> - </Resource> <Resource name="planAfter"> <![CDATA[ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) +- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b]) ]]> - </Resource> </TestCase> <TestCase name="testTableApiScanWithDDL"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]]) +LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -109,11 +94,10 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti ]]> </Resource> </TestCase> - <TestCase name="testTableApiScanWithTemporaryTable"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]]) +LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -122,11 +106,11 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable ]]> </Resource> </TestCase> - <TestCase name="testTableApiScanWithWatermark"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]]) +LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)]) ++- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -136,17 +120,19 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) ]]> </Resource> </TestCase> - - <TestCase name="testTableApiScanWithComputedColumn"> + <TestCase name="testTableSourceScan"> + <Resource name="sql"> + <![CDATA[SELECT * FROM MyTable]]> + </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]]) +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) -+- TableSourceScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b]) +TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 534abe4..df61b48 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -27,6 +27,8 @@ import org.apache.flink.table.plan.rules.dataSet._ import org.apache.flink.table.plan.rules.datastream._ import org.apache.flink.table.plan.rules.logical.{ExtendedAggregateExtractProjectRule, _} +import org.apache.calcite.rel.logical.{LogicalJoin, LogicalProject} + object FlinkRuleSets { /** @@ -42,8 +44,7 @@ object FlinkRuleSets { * can create new plan nodes. */ val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList( - LogicalCorrelateToTemporalTableJoinRule.INSTANCE, - TableScanRule.INSTANCE) + LogicalCorrelateToTemporalTableJoinRule.INSTANCE) val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList( EnumerableToLogicalTableScan.INSTANCE) @@ -69,7 +70,11 @@ object FlinkRuleSets { FilterProjectTransposeRule.INSTANCE, // push a projection to the children of a join // push all expressions to handle the time indicator correctly - new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER), + new ProjectJoinTransposeRule( + classOf[LogicalProject], + classOf[LogicalJoin], + PushProjector.ExprCondition.FALSE, + RelFactories.LOGICAL_BUILDER), // merge projections ProjectMergeRule.INSTANCE, // remove identity project
