[FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it (2)
This closes #3520. fix compilation failure fix compilation failure again. 1. Deep copy TableSource when we copy TableSourceScan 2. unify push project into scan rule for both batch and stream address comments. expand project list before creating new RexProgram update tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f22aae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f22aae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f22aae Branch: refs/heads/master Commit: 78f22aaec9bd7d39fdec8477335e5c9247d42030 Parents: 9f6cd2e Author: Kurt Young <[email protected]> Authored: Mon Mar 13 15:30:13 2017 +0800 Committer: Kurt Young <[email protected]> Committed: Fri Mar 17 18:01:50 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/KafkaTableSource.java | 5 + .../flink/addons/hbase/HBaseTableSource.java | 7 +- .../flink/table/api/BatchTableEnvironment.scala | 2 +- .../table/api/StreamTableEnvironment.scala | 2 +- .../flink/table/api/TableEnvironment.scala | 12 - .../flink/table/calcite/RexNodeWrapper.scala | 106 ------ .../flink/table/plan/nodes/CommonCalc.scala | 3 +- .../table/plan/nodes/TableSourceScan.scala | 63 ++++ .../table/plan/nodes/dataset/BatchScan.scala | 21 +- .../nodes/dataset/BatchTableSourceScan.scala | 47 +-- .../table/plan/nodes/dataset/DataSetCalc.scala | 23 +- .../table/plan/nodes/dataset/DataSetScan.scala | 14 +- .../plan/nodes/datastream/DataStreamCalc.scala | 24 +- .../plan/nodes/datastream/DataStreamScan.scala | 8 +- .../plan/nodes/datastream/StreamScan.scala | 10 +- .../datastream/StreamTableSourceScan.scala | 52 +-- .../PushFilterIntoTableSourceScanRuleBase.scala | 104 ++++++ ...PushProjectIntoTableSourceScanRuleBase.scala | 57 +++ ...PushFilterIntoBatchTableSourceScanRule.scala | 58 +--- ...ushProjectIntoBatchTableSourceScanRule.scala | 48 +-- ...ushFilterIntoStreamTableSourceScanRule.scala | 58 +--- ...shProjectIntoStreamTableSourceScanRule.scala | 40 +-- .../table/plan/schema/TableSourceTable.scala | 1 - .../util/RexProgramExpressionExtractor.scala | 163 --------- .../table/plan/util/RexProgramExtractor.scala | 183 ++++++++++ .../plan/util/RexProgramProjectExtractor.scala | 120 ------- .../table/plan/util/RexProgramRewriter.scala | 91 +++++ .../table/sources/FilterableTableSource.scala | 38 +- .../table/sources/ProjectableTableSource.scala | 9 +- .../flink/table/sources/TableSource.scala | 2 + .../flink/table/validate/FunctionCatalog.scala | 5 +- .../apache/flink/table/TableSourceTest.scala | 170 ++++++--- .../api/scala/batch/TableSourceITCase.scala | 4 +- .../api/scala/stream/TableSourceITCase.scala | 4 +- .../expressions/utils/ExpressionTestBase.scala | 4 +- .../RexProgramExpressionExtractorTest.scala | 182 ---------- .../plan/util/RexProgramExtractorTest.scala | 346 +++++++++++++++++++ .../util/RexProgramProjectExtractorTest.scala | 121 ------- .../plan/util/RexProgramRewriterTest.scala | 62 ++++ .../table/plan/util/RexProgramTestBase.scala | 80 +++++ .../flink/table/utils/CommonTestData.scala | 122 +------ .../table/utils/MockTableEnvironment.scala | 39 +++ .../table/utils/TestFilterableTableSource.scala | 134 +++++++ 43 files changed, 1452 insertions(+), 1192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index dd32bdd..506358d 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -137,4 +137,9 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> { protected DeserializationSchema<Row> getDeserializationSchema() { return deserializationSchema; } + + @Override + public String explainSource() { + return ""; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index a1be23f..f709212 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -108,7 +108,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable } @Override - public ProjectableTableSource<Row> projectFields(int[] fields) { + public HBaseTableSource projectFields(int[] fields) { String[] famNames = schema.getFamilyNames(); HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName); // Extract the family from the given fields @@ -122,4 +122,9 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable } return newTableSource; } + + @Override + public String explainSource() { + return ""; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 7f27357..b48e9f9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -95,7 +95,7 @@ abstract class BatchTableEnvironment( tableSource match { case batchTableSource: BatchTableSource[_] => - registerTableInternal(name, new TableSourceTable(batchTableSource, this)) + registerTableInternal(name, new TableSourceTable(batchTableSource)) case _ => throw new TableException("Only BatchTableSource can be registered in " + "BatchTableEnvironment") http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 7e9f38f..d927c3a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -136,7 +136,7 @@ abstract class StreamTableEnvironment( tableSource match { case streamTableSource: StreamTableSource[_] => - registerTableInternal(name, new TableSourceTable(streamTableSource, this)) + registerTableInternal(name, new TableSourceTable(streamTableSource)) case _ => throw new TableException("Only StreamTableSource can be registered in " + "StreamTableEnvironment") http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 291f49f..1dda3a8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -842,18 +842,6 @@ object TableEnvironment { } /** - * Returns field types for a given [[TableSource]]. - * - * @param tableSource The TableSource to extract field types from. - * @tparam A The type of the TableSource. - * @return An array holding the field types. - */ - def getFieldTypes[A](tableSource: TableSource[A]): Array[TypeInformation[_]] = { - val returnType = tableSource.getReturnType - TableEnvironment.getFieldTypes(returnType) - } - - /** * Returns field names for a given [[TableSource]]. * * @param tableSource The TableSource to extract field names from. http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala deleted file mode 100644 index 1926a67..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.calcite - -import org.apache.calcite.rex._ -import org.apache.calcite.sql._ -import org.apache.flink.table.api.TableException -import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} -import org.apache.flink.table.validate.FunctionCatalog -import org.apache.flink.table.calcite.RexNodeWrapper._ - -abstract class RexNodeWrapper(rex: RexNode) { - def get: RexNode = rex - def toExpression(names: Map[RexInputRef, String]): Expression -} - -case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) { - override def toExpression(names: Map[RexInputRef, String]): Expression = { - val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType) - Literal(literal.getValue, typeInfo) - } -} - -case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) { - override def toExpression(names: Map[RexInputRef, String]): Expression = { - val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType) - ResolvedFieldReference(names(input), typeInfo) - } -} - -case class RexCallWrapper( - call: RexCall, - operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) { - - override def toExpression(names: Map[RexInputRef, String]): Expression = { - val ops = operands.map(_.toExpression(names)) - call.op match { - case function: SqlFunction => - lookupFunction(replace(function.getName), ops) - case postfix: SqlPostfixOperator => - lookupFunction(replace(postfix.getName), ops) - case operator@_ => - val name = replace(s"${operator.kind}") - lookupFunction(name, ops) - } - } - - def replace(str: String): String = { - str.replaceAll("\\s|_", "") - } -} - -object RexNodeWrapper { - - private var catalog: Option[FunctionCatalog] = None - - def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = { - catalog = Option(functionCatalog) - rex.accept(new WrapperVisitor) - } - - private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = { - catalog.getOrElse(throw TableException("FunctionCatalog was not defined")) - .lookupFunction(name, operands) - } -} - -class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) { - - override def visitInputRef(inputRef: RexInputRef): RexNodeWrapper = { - RexInputWrapper(inputRef) - } - - override def visitLiteral(literal: RexLiteral): RexNodeWrapper = { - RexLiteralWrapper(literal) - } - - override def visitLocalRef(localRef: RexLocalRef): RexNodeWrapper = { - localRef.accept(this) - } - - override def visitCall(call: RexCall): RexNodeWrapper = { - val operands = for { - x <- 0 until call.operands.size() - } yield { - call.operands.get(x).accept(this) - } - RexCallWrapper(call, operands) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index 8b07aac..bc25140 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -167,7 +167,8 @@ trait CommonCalc { case _ => true } - planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0) + val newRowCnt = estimateRowCount(calcProgram, rowCnt) + planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0) } private[flink] def estimateRowCount( http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala new file mode 100644 index 0000000..e0f7786 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} +import org.apache.calcite.rel.RelWriter +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConverters._ + +abstract class TableSourceScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + val tableSource: TableSource[_]) + extends TableScan(cluster, traitSet, table) { + + override def deriveRowType(): RelDataType = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val terms = super.explainTerms(pw) + .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) + + val sourceDesc = tableSource.explainSource() + if (sourceDesc.nonEmpty) { + terms.item("source", sourceDesc) + } else { + terms + } + } + + override def toString: String = { + s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" + } + + def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): TableSourceScan + +} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala index 09262a6..b39b8ed 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala @@ -18,9 +18,6 @@ package org.apache.flink.table.plan.nodes.dataset -import org.apache.calcite.plan._ -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory @@ -31,23 +28,7 @@ import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -abstract class BatchScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable) - extends TableScan(cluster, traitSet, table) - with CommonScan - with DataSetRel { - - override def toString: String = { - s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" - } - - override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { - - val rowCnt = metadata.getRowCount(this) - planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) - } +trait BatchScan extends CommonScan with DataSetRel { protected def convertToInternalRow( input: DataSet[Any], http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 11f595c..a9784e2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -19,33 +19,25 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelNode, RelWriter} -import org.apache.calcite.rex.RexNode import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.plan.nodes.TableSourceScan import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.{BatchTableSource, TableSource} import org.apache.flink.types.Row -import org.apache.flink.table.sources.BatchTableSource /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - val tableSource: BatchTableSource[_], - filterCondition: RexNode = null) - extends BatchScan(cluster, traitSet, table) { + tableSource: BatchTableSource[_]) + extends TableSourceScan(cluster, traitSet, table, tableSource) + with BatchScan { - override def deriveRowType() = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) - } - - override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) } @@ -55,27 +47,22 @@ class BatchTableSourceScan( cluster, traitSet, getTable, - tableSource, - filterCondition + tableSource ) } - override def explainTerms(pw: RelWriter): RelWriter = { - val terms = super.explainTerms(pw) - .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) - if (filterCondition != null) { - import scala.collection.JavaConverters._ - val fieldNames = getTable.getRowType.getFieldNames.asScala.toList - terms.item("filter", getExpressionString(filterCondition, fieldNames, None)) - } - terms + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { + new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] + ) } override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val config = tableEnv.getConfig val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - - convertToInternalRow(inputDataSet, new TableSourceTable(tableSource, tableEnv), config) + convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config) } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 972e45b..e05b5a8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -20,8 +20,9 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex._ import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.DataSet @@ -40,35 +41,29 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, - private[flink] val calcProgram: RexProgram, // for tests + calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) with CommonCalc with DataSetRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetCalc( - cluster, - traitSet, - inputs.get(0), - getRowType, - calcProgram, - ruleDescription) + override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { + new DataSetCalc(cluster, traitSet, child, getRowType, program, ruleDescription) } override def toString: String = calcToString(calcProgram, getExpressionString) override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw) + pw.input("input", getInput) .item("select", selectionToString(calcProgram, getExpressionString)) .itemIf("where", conditionToString(calcProgram, getExpressionString), calcProgram.getCondition != null) } - override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val child = this.getInput val rowCnt = metadata.getRowCount(child) http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala index 44d2d00..b1cf106 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala @@ -21,6 +21,8 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.plan.schema.DataSetTable @@ -36,11 +38,17 @@ class DataSetScan( traitSet: RelTraitSet, table: RelOptTable, rowRelDataType: RelDataType) - extends BatchScan(cluster, traitSet, table) { + extends TableScan(cluster, traitSet, table) + with BatchScan { val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]]) - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetScan( @@ -52,10 +60,8 @@ class DataSetScan( } override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val config = tableEnv.getConfig val inputDataSet: DataSet[Any] = dataSetTable.dataSet - convertToInternalRow(inputDataSet, dataSetTable, config) } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 26778d7..b015a1d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -20,8 +20,9 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexProgram import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.datastream.DataStream @@ -40,36 +41,29 @@ class DataStreamCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, - private[flink] val calcProgram: RexProgram, + calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) with CommonCalc with DataStreamRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataStreamCalc( - cluster, - traitSet, - inputs.get(0), - getRowType, - calcProgram, - ruleDescription - ) + override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { + new DataStreamCalc(cluster, traitSet, child, getRowType, program, ruleDescription) } override def toString: String = calcToString(calcProgram, getExpressionString) override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw) + pw.input("input", getInput) .item("select", selectionToString(calcProgram, getExpressionString)) .itemIf("where", conditionToString(calcProgram, getExpressionString), calcProgram.getCondition != null) } - override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val child = this.getInput val rowCnt = metadata.getRowCount(child) computeSelfCost(calcProgram, planner, rowCnt) http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index e8d218e..c187ae8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.TableScan import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.schema.DataStreamTable @@ -36,11 +37,12 @@ class DataStreamScan( traitSet: RelTraitSet, table: RelOptTable, rowRelDataType: RelDataType) - extends StreamScan(cluster, traitSet, table) { + extends TableScan(cluster, traitSet, table) + with StreamScan { val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamScan( @@ -52,10 +54,8 @@ class DataStreamScan( } override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { - val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream - convertToInternalRow(inputDataStream, dataStreamTable, config) } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index 56f7f27..6d08302 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan._ -import org.apache.calcite.rel.core.TableScan import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory @@ -30,13 +28,7 @@ import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -abstract class StreamScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable) - extends TableScan(cluster, traitSet, table) - with CommonScan - with DataStreamRel { +trait StreamScan extends CommonScan with DataStreamRel { protected def convertToInternalRow( input: DataStream[Any], http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index b808d8d..013c55f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -19,33 +19,25 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelNode, RelWriter} -import org.apache.calcite.rex.RexNode -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.nodes.TableSourceScan import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.types.Row -import org.apache.flink.table.sources.StreamTableSource -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - val tableSource: StreamTableSource[_], - filterCondition: RexNode = null) - extends StreamScan(cluster, traitSet, table) { - - override def deriveRowType() = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) - } + tableSource: StreamTableSource[_]) + extends TableSourceScan(cluster, traitSet, table, tableSource) + with StreamScan { - override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) } @@ -55,28 +47,22 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource, - filterCondition + tableSource ) } - override def explainTerms(pw: RelWriter): RelWriter = { - val terms = super.explainTerms(pw) - .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) - if (filterCondition != null) { - import scala.collection.JavaConverters._ - val fieldNames = getTable.getRowType.getFieldNames.asScala.toList - terms.item("filter", getExpressionString(filterCondition, fieldNames, None)) - } - terms + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { + new StreamTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[StreamTableSource[_]] + ) } override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { - val config = tableEnv.getConfig - val inputDataStream: DataStream[Any] = tableSource - .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - - convertToInternalRow(inputDataStream, new TableSourceTable(tableSource, tableEnv), config) + val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] + convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config) } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala new file mode 100644 index 0000000..b07f78e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import java.util + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + + Preconditions.checkArgument(!filterableSource.isFilterPushedDown) + + val program = calc.getProgram + val functionCatalog = FunctionCatalog.withBuiltIns + val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( + program, + call.builder().getRexBuilder, + functionCatalog) + if (predicates.isEmpty) { + // no condition can be translated to expression + return + } + + val remainingPredicates = new util.LinkedList[Expression]() + predicates.foreach(e => remainingPredicates.add(e)) + + val newTableSource = filterableSource.applyPredicate(remainingPredicates) + + // check whether framework still need to do a filter + val relBuilder = call.builder() + val remainingCondition = { + if (!remainingPredicates.isEmpty || unconvertedRexNodes.nonEmpty) { + relBuilder.push(scan) + val remainingConditions = + (remainingPredicates.asScala.map(expr => expr.toRexNode(relBuilder)) + ++ unconvertedRexNodes) + remainingConditions.reduce((l, r) => relBuilder.and(l, r)) + } else { + null + } + } + + // check whether we still need a RexProgram. An RexProgram is needed when either + // projection or filter exists. + val newScan = scan.copy(scan.getTraitSet, newTableSource) + val newRexProgram = { + if (remainingCondition != null || !program.projectsOnlyIdentity) { + val expandedProjectList = program.getProjectList.asScala + .map(ref => program.expandLocalRef(ref)).asJava + RexProgram.create( + program.getInputRowType, + expandedProjectList, + remainingCondition, + program.getOutputRowType, + relBuilder.getRexBuilder) + } else { + null + } + } + + if (newRexProgram != null) { + val newCalc = calc.copy(calc.getTraitSet, newScan, newRexProgram) + call.transformTo(newCalc) + } else { + call.transformTo(newScan) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala new file mode 100644 index 0000000..9f9c805 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter} +import org.apache.flink.table.sources.ProjectableTableSource + +trait PushProjectIntoTableSourceScanRuleBase { + + private[flink] def pushProjectIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan): Unit = { + + val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram) + + // if no fields can be projected, we keep the original plan. + if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { + val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] + val newTableSource = originTableSource.projectFields(usedFields) + val newScan = scan.copy(scan.getTraitSet, newTableSource) + val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection( + calc.getProgram, + newScan.getRowType, + calc.getCluster.getRexBuilder, + usedFields) + + if (newCalcProgram.isTrivial) { + // drop calc if the transformed program merely returns its input and doesn't exist filter + call.transformTo(newScan) + } else { + val newCalc = calc.copy(calc.getTraitSet, newScan, newCalcProgram) + call.transformTo(newCalc) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala index f95e34e..8cfd748 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala @@ -20,23 +20,23 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.rex.RexProgram import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} -import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.FilterableTableSource class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( operand(classOf[DataSetCalc], operand(classOf[BatchTableSourceScan], none)), - "PushFilterIntoBatchTableSourceScanRule") { + "PushFilterIntoBatchTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { - override def matches(call: RelOptRuleCall) = { + override def matches(call: RelOptRuleCall): Boolean = { val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] scan.tableSource match { - case _: FilterableTableSource => - calc.calcProgram.getCondition != null + case source: FilterableTableSource[_] => + calc.getProgram.getCondition != null && !source.isFilterPushedDown case _ => false } } @@ -44,49 +44,9 @@ class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( override def onMatch(call: RelOptRuleCall): Unit = { val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - - val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] - - val program: RexProgram = calc.calcProgram - val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) - val predicate = extractPredicateExpressions( - program, - call.builder().getRexBuilder, - tst.tableEnv.getFunctionCatalog) - - if (predicate.length != 0) { - val remainingPredicate = filterableSource.setPredicate(predicate) - - if (verifyExpressions(predicate, remainingPredicate)) { - - val filterRexNode = getFilterExpressionAsRexNode( - program.getInputRowType, - scan, - predicate.diff(remainingPredicate))(call.builder()) - - val newScan = new BatchTableSourceScan( - scan.getCluster, - scan.getTraitSet, - scan.getTable, - scan.tableSource, - filterRexNode) - - val newCalcProgram = rewriteRexProgram( - program, - newScan, - remainingPredicate)(call.builder()) - - val newCalc = new DataSetCalc( - calc.getCluster, - calc.getTraitSet, - newScan, - calc.getRowType, - newCalcProgram, - description) - - call.transformTo(newCalc) - } - } + val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) + val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]] + pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description) } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala index 53f5fff..8c83047 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala @@ -18,22 +18,22 @@ package org.apache.flink.table.plan.rules.dataSet -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.plan.RelOptRule.{none, operand} -import org.apache.flink.table.api.TableEnvironment +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} -import org.apache.flink.table.plan.util.RexProgramProjectExtractor._ -import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource} +import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase +import org.apache.flink.table.sources.ProjectableTableSource /** * This rule tries to push projections into a BatchTableSourceScan. */ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( operand(classOf[DataSetCalc], - operand(classOf[BatchTableSourceScan], none)), - "PushProjectIntoBatchTableSourceScanRule") { + operand(classOf[BatchTableSourceScan], none)), + "PushProjectIntoBatchTableSourceScanRule") + with PushProjectIntoTableSourceScanRuleBase { - override def matches(call: RelOptRuleCall) = { + override def matches(call: RelOptRuleCall): Boolean = { val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] scan.tableSource match { case _: ProjectableTableSource[_] => true @@ -44,39 +44,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( override def onMatch(call: RelOptRuleCall) { val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - - val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram) - - // if no fields can be projected, we keep the original plan. - if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { - val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] - val newTableSource = originTableSource.projectFields(usedFields) - val newScan = new BatchTableSourceScan( - scan.getCluster, - scan.getTraitSet, - scan.getTable, - newTableSource.asInstanceOf[BatchTableSource[_]]) - - val newCalcProgram = rewriteRexProgram( - calc.calcProgram, - newScan.getRowType, - usedFields, - calc.getCluster.getRexBuilder) - - if (newCalcProgram.isTrivial) { - // drop calc if the transformed program merely returns its input and doesn't exist filter - call.transformTo(newScan) - } else { - val newCalc = new DataSetCalc( - calc.getCluster, - calc.getTraitSet, - newScan, - calc.getRowType, - newCalcProgram, - description) - call.transformTo(newCalc) - } - } + pushProjectIntoScan(call, calc, scan) } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala index 9c02dd7..53a3bcd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala @@ -21,21 +21,22 @@ package org.apache.flink.table.plan.rules.datastream import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} -import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.FilterableTableSource class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( operand(classOf[DataStreamCalc], operand(classOf[StreamTableSourceScan], none)), - "PushFilterIntoStreamTableSourceScanRule") { + "PushFilterIntoStreamTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { - override def matches(call: RelOptRuleCall) = { + override def matches(call: RelOptRuleCall): Boolean = { val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] scan.tableSource match { - case _: FilterableTableSource => - calc.calcProgram.getCondition != null + case source: FilterableTableSource[_] => + calc.getProgram.getCondition != null && !source.isFilterPushedDown case _ => false } } @@ -43,51 +44,10 @@ class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( override def onMatch(call: RelOptRuleCall): Unit = { val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] - - val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] - - val program = calc.calcProgram - val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) - val predicates = extractPredicateExpressions( - program, - call.builder().getRexBuilder, - tst.tableEnv.getFunctionCatalog) - - if (predicates.length != 0) { - val remainingPredicate = filterableSource.setPredicate(predicates) - - if (verifyExpressions(predicates, remainingPredicate)) { - - val filterRexNode = getFilterExpressionAsRexNode( - program.getInputRowType, - scan, - predicates.diff(remainingPredicate))(call.builder()) - - val newScan = new StreamTableSourceScan( - scan.getCluster, - scan.getTraitSet, - scan.getTable, - scan.tableSource, - filterRexNode) - - val newCalcProgram = rewriteRexProgram( - program, - newScan, - remainingPredicate)(call.builder()) - - val newCalc = new DataStreamCalc( - calc.getCluster, - calc.getTraitSet, - newScan, - calc.getRowType, - newCalcProgram, - description) - - call.transformTo(newCalc) - } - } + val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) + val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]] + pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description) } - } object PushFilterIntoStreamTableSourceScanRule { http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala index 0c20f2a..903162e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala @@ -20,9 +20,8 @@ package org.apache.flink.table.plan.rules.datastream import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} -import org.apache.flink.table.plan.util.RexProgramProjectExtractor._ +import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource} /** @@ -31,7 +30,8 @@ import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( operand(classOf[DataStreamCalc], operand(classOf[StreamTableSourceScan], none())), - "PushProjectIntoStreamTableSourceScanRule") { + "PushProjectIntoStreamTableSourceScanRule") + with PushProjectIntoTableSourceScanRuleBase { /** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { @@ -45,39 +45,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( override def onMatch(call: RelOptRuleCall): Unit = { val calc = call.rel(0).asInstanceOf[DataStreamCalc] val scan = call.rel(1).asInstanceOf[StreamTableSourceScan] - - val usedFields = extractRefInputFields(calc.calcProgram) - - // if no fields can be projected, we keep the original plan - if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { - val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] - val newTableSource = originTableSource.projectFields(usedFields) - val newScan = new StreamTableSourceScan( - scan.getCluster, - scan.getTraitSet, - scan.getTable, - newTableSource.asInstanceOf[StreamTableSource[_]]) - - val newProgram = rewriteRexProgram( - calc.calcProgram, - newScan.getRowType, - usedFields, - calc.getCluster.getRexBuilder) - - if (newProgram.isTrivial) { - // drop calc if the transformed program merely returns its input and doesn't exist filter - call.transformTo(newScan) - } else { - val newCalc = new DataStreamCalc( - calc.getCluster, - calc.getTraitSet, - newScan, - calc.getRowType, - newProgram, - description) - call.transformTo(newCalc) - } - } + pushProjectIntoScan(call, calc, scan) } } http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index faf5efc..a3851e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -25,7 +25,6 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], - val tableEnv: TableEnvironment, override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) extends FlinkTable[T]( typeInfo = tableSource.getReturnType, http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala deleted file mode 100644 index 337b3de..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.util - -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rex._ -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} -import org.apache.flink.table.expressions._ -import org.apache.flink.table.validate.FunctionCatalog - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.collection.immutable.IndexedSeq - -object RexProgramExpressionExtractor { - - /** - * converts a rexProgram condition into independent CNF expressions - * - * @param rexProgram The RexProgram to analyze - * @return converted expression - */ - private[flink] def extractPredicateExpressions( - rexProgram: RexProgram, - rexBuilder: RexBuilder, - catalog: FunctionCatalog): Array[Expression] = { - - val fieldNames = getInputsWithNames(rexProgram) - - val condition = rexProgram.getCondition - if (condition == null) { - return Array.empty - } - val call = rexProgram.expandLocalRef(condition) - val cnf = RexUtil.toCnf(rexBuilder, call) - val conjunctions = RelOptUtil.conjunctions(cnf) - val expressions = conjunctions.asScala.map( - RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames) - ) - expressions.toArray - } - - /** - * verify should we apply remained expressions on - * - * @param original initial expression - * @param remained remained part of original expression - * @return whether or not to decouple parts of the origin expression - */ - private[flink] def verifyExpressions( - original: Array[Expression], - remained: Array[Expression]): Boolean = - remained forall (original contains) - - /** - * Generates a new RexProgram based on new expression. - * - * @param rexProgram original RexProgram - * @param scan input source - * @param predicate filter condition (fields must be resolved) - * @param relBuilder builder for converting expression to Rex - */ - private[flink] def rewriteRexProgram( - rexProgram: RexProgram, - scan: TableScan, - predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = { - - relBuilder.push(scan) - - val inType = rexProgram.getInputRowType - val resolvedExps = resolveFields(predicate, inType) - val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef) - - RexProgram.create( - inType, - projs, - conjunct(resolvedExps).get.toRexNode, - rexProgram.getOutputRowType, - relBuilder.getRexBuilder) - } - - private[flink] def getFilterExpressionAsRexNode( - inputTpe: RelDataType, - scan: TableScan, - exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.push(scan) - val resolvedExps = resolveFields(exps, inputTpe) - val fullExp = conjunct(resolvedExps) - if (fullExp.isDefined) { - fullExp.get.toRexNode - } else { - null - } - } - - private def resolveFields( - predicate: Array[Expression], - inType: RelDataType): Array[Expression] = { - val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList - .map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType)) - .toMap - val rule: PartialFunction[Expression, Expression] = { - case u@UnresolvedFieldReference(name) => - ResolvedFieldReference(name, fieldTypes(name)) - } - predicate.map(_.postOrderTransform(rule)) - } - - private def conjunct(exps: Array[Expression]): Option[Expression] = { - def overIndexes(): IndexedSeq[Expression] = { - for { - i <- exps.indices by 2 - } yield { - if (i + 1 < exps.length) { - And(exps(i), exps(i + 1)) - } else { - exps(i) - } - } - } - exps.length match { - case 0 => - None - case 1 => - Option(exps(0)) - case _ => - conjunct(overIndexes().toArray) - } - } - - private def getInputsWithNames(rexProgram: RexProgram): Map[RexInputRef, String] = { - val names = rexProgram.getInputRowType.getFieldNames - - val buffer = for { - exp <- rexProgram.getExprList.asScala - if exp.isInstanceOf[RexInputRef] - ref = exp.asInstanceOf[RexInputRef] - } yield { - ref -> names(ref.getIndex) - } - buffer.toMap - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala new file mode 100644 index 0000000..433a35b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** + * Extracts the indices of input fields which accessed by the RexProgram. + * + * @param rexProgram The RexProgram to analyze + * @return The indices of accessed input fields + */ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { + val visitor = new InputRefVisitor + + // extract referenced input fields from projections + rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + + // extract referenced input fields from condition + val condition = rexProgram.getCondition + if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) + } + + visitor.getFields + } + + /** + * Extract condition from RexProgram and convert it into independent CNF expressions. + * + * @param rexProgram The RexProgram to analyze + * @return converted expressions as well as RexNodes which cannot be translated + */ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + + rexProgram.getCondition match { + case condition: RexLocalRef => + val expanded = rexProgram.expandLocalRef(condition) + // converts the expanded expression to conjunctive normal form, + // like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" + val cnf = RexUtil.toCnf(rexBuilder, expanded) + // converts the cnf condition to a list of AND conditions + val conjunctions = RelOptUtil.conjunctions(cnf) + + val convertedExpressions = new mutable.ArrayBuffer[Expression] + val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] + val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray + val converter = new RexNodeToExpressionConverter(inputNames, catalog) + + conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { + case Some(expression) => convertedExpressions += expression + case None => unconvertedRexNodes += rex + } + }) + (convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) + } + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private val fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = + fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = + call.operands.foreach(operand => operand.accept(this)) +} + +/** + * An RexVisitor to convert RexNode to Expression. + * + * @param inputNames The input names of the relation node + * @param functionCatalog The function catalog + */ +class RexNodeToExpressionConverter( + inputNames: Array[String], + functionCatalog: FunctionCatalog) + extends RexVisitor[Option[Expression]] { + + override def visitInputRef(inputRef: RexInputRef): Option[Expression] = { + Preconditions.checkArgument(inputRef.getIndex < inputNames.length) + Some(ResolvedFieldReference( + inputNames(inputRef.getIndex), + FlinkTypeFactory.toTypeInfo(inputRef.getType) + )) + } + + override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = { + throw new TableException("Bug: RexLocalRef should have been expanded") + } + + override def visitLiteral(literal: RexLiteral): Option[Expression] = { + Some(Literal(literal.getValue, FlinkTypeFactory.toTypeInfo(literal.getType))) + } + + override def visitCall(call: RexCall): Option[Expression] = { + val operands = call.getOperands.map( + operand => operand.accept(this).orNull + ) + + // return null if we cannot translate all the operands of the call + if (operands.contains(null)) { + None + } else { + call.getOperator match { + case function: SqlFunction => + lookupFunction(replace(function.getName), operands) + case postfix: SqlPostfixOperator => + lookupFunction(replace(postfix.getName), operands) + case operator@_ => + lookupFunction(replace(s"${operator.getKind}"), operands) + } + } + } + + override def visitFieldAccess(fieldAccess: RexFieldAccess): Option[Expression] = None + + override def visitCorrelVariable(correlVariable: RexCorrelVariable): Option[Expression] = None + + override def visitRangeRef(rangeRef: RexRangeRef): Option[Expression] = None + + override def visitSubQuery(subQuery: RexSubQuery): Option[Expression] = None + + override def visitDynamicParam(dynamicParam: RexDynamicParam): Option[Expression] = None + + override def visitOver(over: RexOver): Option[Expression] = None + + private def lookupFunction(name: String, operands: Seq[Expression]): Option[Expression] = { + Try(functionCatalog.lookupFunction(name, operands)) match { + case Success(expr) => Some(expr) + case Failure(_) => None + } + } + + private def replace(str: String): String = { + str.replaceAll("\\s|_", "") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala deleted file mode 100644 index 1198167..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramProjectExtractor.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.util - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex._ - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.collection.JavaConverters._ - -object RexProgramProjectExtractor { - - /** - * Extracts the indexes of input fields accessed by the RexProgram. - * - * @param rexProgram The RexProgram to analyze - * @return The indexes of accessed input fields - */ - def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { - val visitor = new RefFieldsVisitor - // extract input fields from project expressions - rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor)) - val condition = rexProgram.getCondition - // extract input fields from condition expression - if (condition != null) { - rexProgram.expandLocalRef(condition).accept(visitor) - } - visitor.getFields - } - - /** - * Generates a new RexProgram based on mapped input fields. - * - * @param rexProgram original RexProgram - * @param inputRowType input row type - * @param usedInputFields indexes of used input fields - * @param rexBuilder builder for Rex expressions - * - * @return A RexProgram with mapped input field expressions. - */ - def rewriteRexProgram( - rexProgram: RexProgram, - inputRowType: RelDataType, - usedInputFields: Array[Int], - rexBuilder: RexBuilder): RexProgram = { - - val inputRewriter = new InputRewriter(usedInputFields) - val newProjectExpressions = rexProgram.getProjectList.map( - exp => rexProgram.expandLocalRef(exp).accept(inputRewriter) - ).toList.asJava - - val oldCondition = rexProgram.getCondition - val newConditionExpression = { - oldCondition match { - case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter) - case _ => null // null does not match any type - } - } - RexProgram.create( - inputRowType, - newProjectExpressions, - newConditionExpression, - rexProgram.getOutputRowType, - rexBuilder - ) - } -} - -/** - * A RexVisitor to extract used input fields - */ -class RefFieldsVisitor extends RexVisitorImpl[Unit](true) { - private var fields = mutable.LinkedHashSet[Int]() - - def getFields: Array[Int] = fields.toArray - - override def visitInputRef(inputRef: RexInputRef): Unit = fields += inputRef.getIndex - - override def visitCall(call: RexCall): Unit = - call.operands.foreach(operand => operand.accept(this)) -} - -/** - * A RexShuttle to rewrite field accesses of a RexProgram. - * - * @param fields fields mapping - */ -class InputRewriter(fields: Array[Int]) extends RexShuttle { - - /** old input fields ref index -> new input fields ref index mappings */ - private val fieldMap: Map[Int, Int] = - fields.zipWithIndex.toMap - - override def visitInputRef(inputRef: RexInputRef): RexNode = - new RexInputRef(relNodeIndex(inputRef), inputRef.getType) - - override def visitLocalRef(localRef: RexLocalRef): RexNode = - new RexInputRef(relNodeIndex(localRef), localRef.getType) - - private def relNodeIndex(ref: RexSlot): Int = - fieldMap.getOrElse(ref.getIndex, - throw new IllegalArgumentException("input field contains invalid index")) -} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala new file mode 100644 index 0000000..c8bbf2d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramRewriter.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex._ + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +object RexProgramRewriter { + + /** + * Generates a new RexProgram with used input fields. The used fields maybe + * a subset of total input fields, so we need to convert the field index in + * new RexProgram based on given fields. + * + * @param rexProgram original RexProgram + * @param inputRowType input row type + * @param rexBuilder builder for Rex expressions + * @param usedFields indices of used input fields + * @return A new RexProgram with only used input fields + */ + def rewriteWithFieldProjection( + rexProgram: RexProgram, + inputRowType: RelDataType, + rexBuilder: RexBuilder, + usedFields: Array[Int]): RexProgram = { + + val inputRewriter = new InputRewriter(usedFields) + + // rewrite input field in projections + val newProjectExpressions = rexProgram.getProjectList.map( + exp => rexProgram.expandLocalRef(exp).accept(inputRewriter) + ).toList.asJava + + // rewrite input field in condition + val newConditionExpression = { + rexProgram.getCondition match { + case ref: RexLocalRef => rexProgram.expandLocalRef(ref).accept(inputRewriter) + case _ => null // null does not match any type + } + } + + RexProgram.create( + inputRowType, + newProjectExpressions, + newConditionExpression, + rexProgram.getOutputRowType, + rexBuilder + ) + } +} + +/** + * A RexShuttle to rewrite field accesses of a RexProgram. + * + * @param fields used input fields + */ +class InputRewriter(fields: Array[Int]) extends RexShuttle { + + /** old input fields ref index -> new input fields ref index mappings */ + private val fieldMap: Map[Int, Int] = + fields.zipWithIndex.toMap + + override def visitInputRef(inputRef: RexInputRef): RexNode = + new RexInputRef(refNewIndex(inputRef), inputRef.getType) + + override def visitLocalRef(localRef: RexLocalRef): RexNode = + new RexInputRef(refNewIndex(localRef), localRef.getType) + + private def refNewIndex(ref: RexSlot): Int = + fieldMap.getOrElse(ref.getIndex, + throw new IllegalArgumentException("input field contains invalid index")) +} http://git-wip-us.apache.org/repos/asf/flink/blob/78f22aae/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala index bbbf862..67529a7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala @@ -18,21 +18,41 @@ package org.apache.flink.table.sources -import org.apache.flink.table.expressions.Expression +import java.util.{List => JList} +import org.apache.flink.table.expressions.Expression /** * Adds support for filtering push-down to a [[TableSource]]. - * A [[TableSource]] extending this interface is able to filter the fields of the return table. - * + * A [[TableSource]] extending this interface is able to filter records before returning. */ -trait FilterableTableSource { +trait FilterableTableSource[T] { - /** return an predicate expression that was set. */ - def getPredicate: Array[Expression] + /** + * Check and pick all predicates this table source can support. The passed in predicates + * have been translated in conjunctive form, and table source can only pick those predicates + * that it supports. + * <p> + * After trying to push predicates down, we should return a new [[TableSource]] + * instance which holds all pushed down predicates. Even if we actually pushed nothing down, + * it is recommended that we still return a new [[TableSource]] instance since we will + * mark the returned instance as filter push down has been tried. + * <p> + * We also should note to not changing the form of the predicates passed in. It has been + * organized in CNF conjunctive form, and we should only take or leave each element from the + * list. Don't try to reorganize the predicates if you are absolutely confident with that. + * + * @param predicates A list contains conjunctive predicates, you should pick and remove all + * expressions that can be pushed down. The remaining elements of this list + * will further evaluated by framework. + * @return A new cloned instance of [[TableSource]] with or without any filters been + * pushed into it. + */ + def applyPredicate(predicates: JList[Expression]): TableSource[T] /** - * @param predicate a filter expression that will be applied to fields to return. - * @return an unsupported predicate expression. + * Return the flag to indicate whether filter push down has been tried. Must return true on + * the returned instance of [[applyPredicate]]. */ - def setPredicate(predicate: Array[Expression]): Array[Expression] + def isFilterPushedDown: Boolean + }
