danny0405 commented on a change in pull request #10224:
[FLINK-14716][table-planner-blink] Cooperate computed column with push down
rules
URL: https://github.com/apache/flink/pull/10224#discussion_r347250856
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
##########
@@ -18,88 +18,63 @@
package org.apache.flink.table.planner.catalog;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.plan.schema.FlinkTable;
+import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.TranslatableTable;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
/**
* A bridge between a Flink's specific {@link QueryOperationCatalogView} and a
Calcite's
- * {@link org.apache.calcite.schema.Table}. It implements {@link
TranslatableTable} interface. This enables
- * direct translation from {@link
org.apache.flink.table.operations.QueryOperation} to {@link RelNode}.
- *
- * <p>NOTE: Due to legacy inconsistency in null handling in the {@link
TableSchema} the translation might introduce
- * additional cast to comply with manifested schema in
- * {@link QueryOperationCatalogViewTable#getRowType(RelDataTypeFactory)}.
+ * {@link org.apache.calcite.plan.RelOptTable}. It implements the conversion
from
+ * {@link org.apache.flink.table.operations.QueryOperation} to
+ * {@link org.apache.calcite.rel.RelNode}.
*/
-@Internal
-public class QueryOperationCatalogViewTable extends FlinkTable implements
TranslatableTable {
+public class QueryOperationCatalogViewTable extends FlinkPreparingTableBase {
private final QueryOperationCatalogView catalogView;
- private final RelProtoDataType rowType;
- private final FlinkStatistic statistic;
-
- public static QueryOperationCatalogViewTable
createCalciteTable(QueryOperationCatalogView catalogView) {
- return new QueryOperationCatalogViewTable(catalogView,
typeFactory -> {
- TableSchema tableSchema = catalogView.getSchema();
- List<String> fieldNames =
Arrays.asList(tableSchema.getFieldNames());
- List<LogicalType> fieldTypes =
Arrays.stream(tableSchema.getFieldDataTypes()).map(
-
LogicalTypeDataTypeConverter::fromDataTypeToLogicalType).collect(Collectors.toList());
- return ((FlinkTypeFactory)
typeFactory).buildRelNodeRowType(
-
JavaScalaConversionUtil.toScala(fieldNames),
-
JavaScalaConversionUtil.toScala(fieldTypes));
- }, FlinkStatistic.UNKNOWN()); // TODO supports statistic
- }
+ /** Creates a QueryOperationCatalogViewTable. */
private QueryOperationCatalogViewTable(
- QueryOperationCatalogView catalogView,
- RelProtoDataType rowType,
- FlinkStatistic statistic) {
+ RelOptSchema relOptSchema,
+ List<String> names,
+ RelDataType rowType,
+ QueryOperationCatalogView catalogView) {
+ super(relOptSchema, rowType, names, FlinkStatistic.UNKNOWN());
this.catalogView = catalogView;
- this.rowType = rowType;
- this.statistic = statistic;
}
- @Override
- public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable
relOptTable) {
- FlinkRelBuilder relBuilder =
FlinkRelBuilder.of(context.getCluster(), relOptTable);
-
- return
relBuilder.queryOperation(catalogView.getQueryOperation()).build();
+ public static QueryOperationCatalogViewTable create(RelOptSchema
schema, List<String> names,
+ RelDataType rowType, QueryOperationCatalogView view) {
+ return new QueryOperationCatalogViewTable(schema, names,
rowType, view);
}
@Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return rowType.apply(typeFactory);
- }
-
- public QueryOperationCatalogView getCatalogView() {
- return catalogView;
+ public List<String> getQualifiedName() {
+ final QueryOperation queryOperation =
catalogView.getQueryOperation();
+ if (queryOperation instanceof TableSourceQueryOperation) {
+ TableSourceQueryOperation tsqo =
(TableSourceQueryOperation) queryOperation;
+ return explainSourceAsString(tsqo.getTableSource());
+ }
+ return super.getQualifiedName();
}
@Override
- public FlinkStatistic getStatistic() {
- return statistic;
+ public FlinkPreparingTableBase copy(FlinkStatistic statistic) {
+ return new QueryOperationCatalogViewTable(relOptSchema, names,
rowType, catalogView);
Review comment:
It's constructor is private, and we always create it with
`FlinkStatistic.UNKNOWN`, we can add it back if we really support that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services