danny0405 commented on a change in pull request #10224: 
[FLINK-14716][table-planner-blink] Cooperate computed column with push down 
rules
URL: https://github.com/apache/flink/pull/10224#discussion_r347250856
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
 ##########
 @@ -18,88 +18,63 @@
 
 package org.apache.flink.table.planner.catalog;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.plan.schema.FlinkTable;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
-import org.apache.flink.table.types.logical.LogicalType;
 
-import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.TranslatableTable;
 
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * A bridge between a Flink's specific {@link QueryOperationCatalogView} and a 
Calcite's
- * {@link org.apache.calcite.schema.Table}. It implements {@link 
TranslatableTable} interface. This enables
- * direct translation from {@link 
org.apache.flink.table.operations.QueryOperation} to {@link RelNode}.
- *
- * <p>NOTE: Due to legacy inconsistency in null handling in the {@link 
TableSchema} the translation might introduce
- * additional cast to comply with manifested schema in
- * {@link QueryOperationCatalogViewTable#getRowType(RelDataTypeFactory)}.
+ * {@link org.apache.calcite.plan.RelOptTable}. It implements the conversion 
from
+ * {@link org.apache.flink.table.operations.QueryOperation} to
+ * {@link org.apache.calcite.rel.RelNode}.
  */
-@Internal
-public class QueryOperationCatalogViewTable extends FlinkTable implements 
TranslatableTable {
+public class QueryOperationCatalogViewTable extends FlinkPreparingTableBase {
        private final QueryOperationCatalogView catalogView;
-       private final RelProtoDataType rowType;
-       private final FlinkStatistic statistic;
-
-       public static QueryOperationCatalogViewTable 
createCalciteTable(QueryOperationCatalogView catalogView) {
-               return new QueryOperationCatalogViewTable(catalogView, 
typeFactory -> {
-                       TableSchema tableSchema = catalogView.getSchema();
-                       List<String> fieldNames = 
Arrays.asList(tableSchema.getFieldNames());
-                       List<LogicalType> fieldTypes = 
Arrays.stream(tableSchema.getFieldDataTypes()).map(
-                                       
LogicalTypeDataTypeConverter::fromDataTypeToLogicalType).collect(Collectors.toList());
-                       return ((FlinkTypeFactory) 
typeFactory).buildRelNodeRowType(
-                                       
JavaScalaConversionUtil.toScala(fieldNames),
-                                       
JavaScalaConversionUtil.toScala(fieldTypes));
-               }, FlinkStatistic.UNKNOWN()); // TODO supports statistic
-       }
 
+       /** Creates a QueryOperationCatalogViewTable. */
        private QueryOperationCatalogViewTable(
-                       QueryOperationCatalogView catalogView,
-                       RelProtoDataType rowType,
-                       FlinkStatistic statistic) {
+                       RelOptSchema relOptSchema,
+                       List<String> names,
+                       RelDataType rowType,
+                       QueryOperationCatalogView catalogView) {
+               super(relOptSchema, rowType, names, FlinkStatistic.UNKNOWN());
                this.catalogView = catalogView;
-               this.rowType = rowType;
-               this.statistic = statistic;
        }
 
-       @Override
-       public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
-               FlinkRelBuilder relBuilder = 
FlinkRelBuilder.of(context.getCluster(), relOptTable);
-
-               return 
relBuilder.queryOperation(catalogView.getQueryOperation()).build();
+       public static QueryOperationCatalogViewTable create(RelOptSchema 
schema, List<String> names,
+                       RelDataType rowType, QueryOperationCatalogView view) {
+               return new QueryOperationCatalogViewTable(schema, names, 
rowType, view);
        }
 
        @Override
-       public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-               return rowType.apply(typeFactory);
-       }
-
-       public QueryOperationCatalogView getCatalogView() {
-               return catalogView;
+       public List<String> getQualifiedName() {
+               final QueryOperation queryOperation = 
catalogView.getQueryOperation();
+               if (queryOperation instanceof TableSourceQueryOperation) {
+                       TableSourceQueryOperation tsqo = 
(TableSourceQueryOperation) queryOperation;
+                       return explainSourceAsString(tsqo.getTableSource());
+               }
+               return super.getQualifiedName();
        }
 
        @Override
-       public FlinkStatistic getStatistic() {
-               return statistic;
+       public FlinkPreparingTableBase copy(FlinkStatistic statistic) {
+               return new QueryOperationCatalogViewTable(relOptSchema, names, 
rowType, catalogView);
 
 Review comment:
   It's constructor is private, and we always create it with 
`FlinkStatistic.UNKNOWN`, we can add it back if we really support that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to