This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new e1e62e3 [FLINK-13075][table-planner-blink] Project pushdown rule shouldn't require the TableSource return a modified schema in blink planner e1e62e3 is described below commit e1e62e3629a8b5055dbab70709b9790cda33fa30 Author: TsReaper <tsreape...@gmail.com> AuthorDate: Mon Jul 22 17:47:40 2019 +0800 [FLINK-13075][table-planner-blink] Project pushdown rule shouldn't require the TableSource return a modified schema in blink planner This closes #9197 --- .../physical/batch/BatchExecTableSourceScan.scala | 6 +++--- .../physical/stream/StreamExecTableSourceScan.scala | 8 ++++---- .../flink/table/plan/rules/FlinkBatchRuleSets.scala | 2 +- .../logical/PushProjectIntoTableSourceScanRule.scala | 18 +++++++++++++++--- ...ceRule.scala => BatchExecTableSourceScanRule.scala} | 6 +++--- .../flink/table/plan/schema/TableSourceTable.scala | 9 +++++++-- .../table/runtime/batch/sql/TableSourceITCase.scala | 2 -- .../org/apache/flink/table/util/testTableSources.scala | 10 ++-------- 8 files changed, 35 insertions(+), 26 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala index c6d442c..75f154a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala @@ -102,7 +102,7 @@ class BatchExecTableSourceScan( val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, isStreamTable = false, - None) + tableSourceTable.selectedFields) val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType) val producedDataType = tableSource.getProducedDataType @@ -118,7 +118,7 @@ class BatchExecTableSourceScan( // get expression to extract rowtime attribute val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( tableSource, - None, + tableSourceTable.selectedFields, cluster, planner.getRelBuilder ) @@ -144,7 +144,7 @@ class BatchExecTableSourceScan( val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, isStreamTable = false, - None) + tableSourceTable.selectedFields) ScanUtil.hasTimeAttributeField(fieldIndexes) || ScanUtil.needsConversion( tableSource.getProducedDataType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala index bed598a..a67f4c1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -114,7 +114,7 @@ class StreamExecTableSourceScan( val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, isStreamTable = true, - None) + tableSourceTable.selectedFields) val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType) val producedDataType = tableSource.getProducedDataType @@ -130,7 +130,7 @@ class StreamExecTableSourceScan( // get expression to extract rowtime attribute val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( tableSource, - None, + tableSourceTable.selectedFields, cluster, planner.getRelBuilder ) @@ -166,7 +166,7 @@ class StreamExecTableSourceScan( // generate watermarks for rowtime indicator val rowtimeDesc: Option[RowtimeAttributeDescriptor] = - TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, None) + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, tableSourceTable.selectedFields) val withWatermarks = if (rowtimeDesc.isDefined) { val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName) @@ -195,7 +195,7 @@ class StreamExecTableSourceScan( val fieldIndexes = TableSourceUtil.computeIndexMapping( tableSource, isStreamTable = true, - None) + tableSourceTable.selectedFields) ScanUtil.hasTimeAttributeField(fieldIndexes) || ScanUtil.needsConversion( tableSource.getProducedDataType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala index 6538ca6..d353840 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala @@ -362,7 +362,7 @@ object FlinkBatchRuleSets { FlinkExpandConversionRule.BATCH_INSTANCE, // source BatchExecBoundedStreamScanRule.INSTANCE, - BatchExecScanTableSourceRule.INSTANCE, + BatchExecTableSourceScanRule.INSTANCE, BatchExecIntermediateTableScanRule.INSTANCE, BatchExecValuesRule.INSTANCE, // calc diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala index 7518844..3f3395c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala @@ -22,11 +22,11 @@ import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable} import org.apache.flink.table.plan.util._ import org.apache.flink.table.sources._ import org.apache.flink.util.CollectionUtil - import org.apache.calcite.plan.RelOptRule.{none, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan} import org.apache.calcite.rel.rules.ProjectRemoveRule +import org.apache.flink.table.api.TableException /** * Planner rule that pushes a [[LogicalProject]] into a [[LogicalTableScan]] @@ -66,7 +66,8 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable] val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) - val newTableSource = tableSourceTable.tableSource match { + val oldTableSource = tableSourceTable.tableSource + val newTableSource = oldTableSource match { case nested: NestedFieldsProjectableTableSource[_] => val nestedFields = RexNodeExtractor.extractRefNestedInputFields( project.getProjects, usedFields) @@ -74,9 +75,20 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( case projecting: ProjectableTableSource[_] => projecting.projectFields(usedFields) } + + // check that table schema of the new table source is identical to original + if (oldTableSource.getTableSchema != newTableSource.getTableSchema) { + throw new TableException("TableSchema of ProjectableTableSource must not be modified " + + "by projectFields() call. This is a bug in the implementation of the TableSource " + + s"${oldTableSource.getClass.getCanonicalName}.") + } + // project push down does not change the statistic, we can reuse origin statistic val newTableSourceTable = new TableSourceTable( - newTableSource, tableSourceTable.isStreamingMode, tableSourceTable.statistic) + newTableSource, + tableSourceTable.isStreamingMode, + tableSourceTable.statistic, + Option(usedFields)) // row type is changed after project push down val newRowType = newTableSourceTable.getRowType(scan.getCluster.getTypeFactory) val newRelOptTable = relOptTable.copy(newTableSourceTable, newRowType) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala similarity index 94% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala index 0d702db..415b963 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala @@ -31,7 +31,7 @@ import org.apache.flink.table.sources.StreamTableSource /** * Rule that converts [[FlinkLogicalTableSourceScan]] to [[BatchExecTableSourceScan]]. */ -class BatchExecScanTableSourceRule +class BatchExecTableSourceScanRule extends ConverterRule( classOf[FlinkLogicalTableSourceScan], FlinkConventions.LOGICAL, @@ -63,6 +63,6 @@ class BatchExecScanTableSourceRule } } -object BatchExecScanTableSourceRule { - val INSTANCE: RelOptRule = new BatchExecScanTableSourceRule +object BatchExecTableSourceScanRule { + val INSTANCE: RelOptRule = new BatchExecTableSourceScanRule } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 16da7c3..1807c05 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -34,16 +34,21 @@ import org.apache.flink.table.sources.{TableSource, TableSourceUtil} class TableSourceTable[T]( val tableSource: TableSource[T], val isStreamingMode: Boolean, - val statistic: FlinkStatistic) + val statistic: FlinkStatistic, + val selectedFields: Option[Array[Int]]) extends FlinkTable { + def this(tableSource: TableSource[T], isStreamingMode: Boolean, statistic: FlinkStatistic) { + this(tableSource, isStreamingMode, statistic, None) + } + // TODO implements this // TableSourceUtil.validateTableSource(tableSource) override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { TableSourceUtil.getRelDataType( tableSource, - None, + selectedFields, streaming = isStreamingMode, typeFactory.asInstanceOf[FlinkTypeFactory]) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala index 48a0621..fcda2f9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala @@ -179,8 +179,6 @@ class TableSourceITCase extends BatchTestBase { ) } - @Ignore("[FLINK-13075] Project pushdown rule shouldn't require" + - " the TableSource return a modified schema in blink planner") @Test def testLookupJoinCsvTemporalTable(): Unit = { val orders = TestTableSources.getOrdersCsvTableSource diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala index fa73905..93df633 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala @@ -244,9 +244,6 @@ class TestProjectableTableSource( projectedTypes.asInstanceOf[Array[TypeInformation[_]]], projectedNames) - val projectedDataTypes = fields.map(tableSchema.getFieldDataTypes.apply(_)) - val newTableSchema = TableSchema.builder().fields(projectedNames, projectedDataTypes).build() - val projectedValues = values.map { fromRow => val pRow = new Row(fields.length) fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to, fromRow.getField(from)) } @@ -255,7 +252,7 @@ class TestProjectableTableSource( new TestProjectableTableSource( isBounded, - newTableSchema, + tableSchema, projectedReturnType, projectedValues, rowtime, @@ -304,9 +301,6 @@ class TestNestedProjectableTableSource( val newReadNestedFields = projectedNames.zip(nestedFields) .flatMap(f => f._2.map(n => s"${f._1}.$n")) - val projectedDataTypes = fields.map(tableSchema.getFieldDataTypes.apply(_)) - val newTableSchema = TableSchema.builder().fields(projectedNames, projectedDataTypes).build() - val projectedValues = values.map { fromRow => val pRow = new Row(fields.length) fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to, fromRow.getField(from)) } @@ -315,7 +309,7 @@ class TestNestedProjectableTableSource( val copy = new TestNestedProjectableTableSource( isBounded, - newTableSchema, + tableSchema, projectedReturnType, projectedValues, rowtime,