This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac972cd78da4ef1b972dbd2b9afc39e774a2f6fe
Author: yuzhao.cyz <[email protected]>
AuthorDate: Tue Mar 17 19:27:40 2020 +0800

    [FLINK-14338][table-planner][table-planner-blink] Remove usage of 
TableScanRule and use new TableScanFactory extension
    
    * This change was introduced in CALCITE-3769
---
 .../catalog/QueryOperationCatalogViewTable.java    |  7 ++-
 .../table/planner/delegation/PlannerContext.java   | 40 +++--------------
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  4 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  4 +-
 .../planner/plan/schema/CatalogSourceTable.scala   | 14 +-----
 .../table/planner/plan/batch/sql/TableScanTest.xml | 50 ++++++++--------------
 .../flink/table/plan/rules/FlinkRuleSets.scala     | 11 +++--
 7 files changed, 43 insertions(+), 87 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 37bacb3..f184be0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
+import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -68,7 +69,11 @@ public class QueryOperationCatalogViewTable extends 
ExpandingPreparingTable {
 
        @Override
        public RelNode convertToRel(RelOptTable.ToRelContext context) {
-               FlinkRelBuilder relBuilder = 
FlinkRelBuilder.of(context.getCluster(), this.getRelOptSchema());
+               FlinkRelBuilder relBuilder = new FlinkRelBuilder(
+                               // Sets up the view expander.
+                               Contexts.of(context, 
context.getCluster().getPlanner().getContext()),
+                               context.getCluster(),
+                               this.getRelOptSchema());
 
                return 
relBuilder.queryOperation(catalogView.getQueryOperation()).build();
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 20823ea..23372bf 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.table.planner.codegen.ExpressionReducer;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
 import org.apache.flink.table.planner.plan.cost.FlinkCostFactory;
-import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 
@@ -54,11 +53,8 @@ import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.ViewExpanders;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
@@ -159,41 +155,20 @@ public class PlannerContext {
         * @return configured rel builder
         */
        public FlinkRelBuilder createRelBuilder(String currentCatalog, String 
currentDatabase) {
-               FlinkCalciteCatalogReader relOptSchema = 
createCatalogReader(false, currentCatalog, currentDatabase);
+               FlinkCalciteCatalogReader relOptSchema = createCatalogReader(
+                               false,
+                               currentCatalog,
+                               currentDatabase);
 
-               Context chain = Contexts.chain(
+               Context chain = Contexts.of(
                        context,
-                       // We need to overwrite the default scan factory, which 
does not
-                       // expand views. The expandingScanFactory uses the 
FlinkPlanner to translate a view
-                       // into a rel tree, before applying any subsequent 
rules.
-                       Contexts.of(expandingScanFactory(
-                                       createFlinkPlanner(currentCatalog, 
currentDatabase).createToRelContext()))
+                       // Sets up the ViewExpander explicitly for 
FlinkRelBuilder.
+                       createFlinkPlanner(currentCatalog, 
currentDatabase).createToRelContext()
                );
                return new FlinkRelBuilder(chain, cluster, relOptSchema);
        }
 
        /**
-        * Creates a {@link RelFactories.TableScanFactory} that uses a
-        * {@link org.apache.calcite.plan.RelOptTable.ViewExpander} to handle
-        * {@link ExpandingPreparingTable} instances, and falls back to a 
default
-        * factory for other tables.
-        *
-        * @param viewExpander View expander
-        * @return Table scan factory
-        */
-       private static RelFactories.TableScanFactory expandingScanFactory(
-                       RelOptTable.ViewExpander viewExpander) {
-               return (cluster, table) -> {
-                       if (table instanceof ExpandingPreparingTable) {
-                               final RelOptTable.ToRelContext toRelContext =
-                                       
ViewExpanders.toRelContext(viewExpander, cluster);
-                               return table.toRel(toRelContext);
-                       }
-                       return 
RelFactories.DEFAULT_TABLE_SCAN_FACTORY.createScan(cluster, table);
-               };
-       }
-
-       /**
         * Creates a configured {@link FlinkPlannerImpl} for a planning session.
         *
         * @param currentCatalog the current default catalog to look for first 
during planning.
@@ -293,7 +268,6 @@ public class PlannerContext {
                return 
JavaScalaConversionUtil.toJava(calciteConfig.getSqlToRelConverterConfig()).orElseGet(
                                () -> SqlToRelConverter.configBuilder()
                                                .withTrimUnusedFields(false)
-                                               .withConvertTableAccess(true)
                                                
.withInSubQueryThreshold(Integer.MAX_VALUE)
                                                .withExpand(false)
                                                
.withRelBuilderFactory(FlinkRelFactories.FLINK_REL_BUILDER())
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 104c8f3..77b5fc1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -55,8 +55,7 @@ object FlinkBatchRuleSets {
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
     LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
-    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
-    TableScanRule.INSTANCE)
+    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
@@ -65,7 +64,6 @@ object FlinkBatchRuleSets {
     * Convert table references before query decorrelation.
     */
   val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
-    TableScanRule.INSTANCE,
     EnumerableToLogicalTableScan.INSTANCE
   )
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 03e294e..83d6a41 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -56,8 +56,7 @@ object FlinkStreamRuleSets {
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
     LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
     LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
-    LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
-    TableScanRule.INSTANCE)
+    LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
@@ -66,7 +65,6 @@ object FlinkStreamRuleSets {
     * Convert table references before query decorrelation.
     */
   val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
-    TableScanRule.INSTANCE,
     EnumerableToLogicalTableScan.INSTANCE
   )
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 72764fe..094de36 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -25,14 +25,13 @@ import org.apache.flink.table.factories.{TableFactoryUtil, 
TableSourceFactory, T
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder, 
FlinkTypeFactory}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceValidation}
-import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
+
 import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.flink.table.types.logical.{TimestampKind, TimestampType}
 
-import java.util
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
@@ -64,15 +63,6 @@ class CatalogSourceTable[T](
       .toMap
   }
 
-  override def getQualifiedName: JList[String] = {
-    // Do not explain source, we already have full names, table source should 
be created in toRel.
-    val ret = new util.ArrayList[String](names)
-    // Add class name to distinguish TableSourceTable.
-    val name = generateRuntimeName(getClass, 
catalogTable.getSchema.getFieldNames)
-    ret.add(s"catalog_source: [$name]")
-    ret
-  }
-
   override def toRel(context: RelOptTable.ToRelContext): RelNode = {
     val cluster = context.getCluster
     val flinkContext = cluster
@@ -104,7 +94,7 @@ class CatalogSourceTable[T](
       .toArray
     // Copy this table with physical scan row type.
     val newRelTable = tableSourceTable.copy(tableSource, physicalFields)
-    val scan = LogicalTableScan.create(cluster, newRelTable)
+    val scan = LogicalTableScan.create(cluster, newRelTable, 
context.getTableHints)
     val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
     relBuilder.push(scan)
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 63bea00..1c75dff 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -19,7 +19,6 @@ limitations under the License.
   <TestCase name="testDDLTableScan">
     <Resource name="sql">
       <![CDATA[SELECT * FROM src WHERE a > 1]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
@@ -27,80 +26,66 @@ LogicalProject(ts=[$0], a=[$1], b=[$2])
 +- LogicalFilter(condition=[>($1, 1)])
    +- LogicalTableScan(table=[[default_catalog, default_database, src, source: 
[CollectionTableSource(ts, a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[ts, a, b], where=[>(a, 1)])
 +- TableSourceScan(table=[[default_catalog, default_database, src, source: 
[CollectionTableSource(ts, a, b)]]], fields=[ts, a, b])
 ]]>
-
     </Resource>
   </TestCase>
-  <TestCase name="testTableSourceScan">
+  <TestCase name="testDDLWithComputedColumn">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable]]>
-
+      <![CDATA[SELECT * FROM computed_column_t]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], 
e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, 
computed_column_t, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-TableSourceScan(table=[[default_catalog, default_database, MyTable, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
++- TableSourceScan(table=[[default_catalog, default_database, 
computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
-
   <TestCase name="testDDLWithWatermarkComputedColumn">
     <Resource name="sql">
       <![CDATA[SELECT * FROM c_watermark_t]]>
-
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], 
e=[my_udf($0)])
 +- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, 
source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
 +- TableSourceScan(table=[[default_catalog, default_database, c_watermark_t, 
source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
-  <TestCase name="testDDLWithComputedColumn">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM computed_column_t]]>
-
-    </Resource>
+  <TestCase name="testTableApiScanWithComputedColumn">
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], 
e=[my_udf($0)])
 +- LogicalTableScan(table=[[default_catalog, default_database, 
computed_column_t, source: [CollectionTableSource(a, b)]]])
 ]]>
-
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
 +- TableSourceScan(table=[[default_catalog, default_database, 
computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
-
     </Resource>
   </TestCase>
   <TestCase name="testTableApiScanWithDDL">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, 
catalog_source: [CatalogSourceTable(a, b)]]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, source: 
[CollectionTableSource(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -109,11 +94,10 @@ TableSourceScan(table=[[default_catalog, default_database, 
t1, source: [Collecti
 ]]>
     </Resource>
   </TestCase>
-
   <TestCase name="testTableApiScanWithTemporaryTable">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, 
catalog_source: [CatalogSourceTable(word)]]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, source: 
[CsvTableSource(read fields: word)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -122,11 +106,11 @@ TableSourceScan(table=[[default_catalog, 
default_database, t1, source: [CsvTable
 ]]>
     </Resource>
   </TestCase>
-
   <TestCase name="testTableApiScanWithWatermark">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, 
catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
+LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], 
e=[my_udf($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, 
source: [CollectionTableSource(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -136,17 +120,19 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, 
my_udf(a) AS e])
 ]]>
     </Resource>
   </TestCase>
-
-  <TestCase name="testTableApiScanWithComputedColumn">
+  <TestCase name="testTableSourceScan">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable]]>
+    </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, 
catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
-+- TableSourceScan(table=[[default_catalog, default_database, 
computed_column_t, source: [CollectionTableSource(a, b)]]], fields=[a, b])
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 534abe4..df61b48 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -27,6 +27,8 @@ import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
 import 
org.apache.flink.table.plan.rules.logical.{ExtendedAggregateExtractProjectRule, 
_}
 
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalProject}
+
 object FlinkRuleSets {
 
   /**
@@ -42,8 +44,7 @@ object FlinkRuleSets {
     * can create new plan nodes.
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToTemporalTableJoinRule.INSTANCE,
-    TableScanRule.INSTANCE)
+    LogicalCorrelateToTemporalTableJoinRule.INSTANCE)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
@@ -69,7 +70,11 @@ object FlinkRuleSets {
     FilterProjectTransposeRule.INSTANCE,
     // push a projection to the children of a join
     // push all expressions to handle the time indicator correctly
-    new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, 
RelFactories.LOGICAL_BUILDER),
+    new ProjectJoinTransposeRule(
+      classOf[LogicalProject],
+      classOf[LogicalJoin],
+      PushProjector.ExprCondition.FALSE,
+      RelFactories.LOGICAL_BUILDER),
     // merge projections
     ProjectMergeRule.INSTANCE,
     // remove identity project

Reply via email to