This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new e1e62e3 [FLINK-13075][table-planner-blink] Project pushdown rule
shouldn't require the TableSource return a modified schema in blink planner
e1e62e3 is described below
commit e1e62e3629a8b5055dbab70709b9790cda33fa30
Author: TsReaper <[email protected]>
AuthorDate: Mon Jul 22 17:47:40 2019 +0800
[FLINK-13075][table-planner-blink] Project pushdown rule shouldn't require
the TableSource return a modified schema in blink planner
This closes #9197
---
.../physical/batch/BatchExecTableSourceScan.scala | 6 +++---
.../physical/stream/StreamExecTableSourceScan.scala | 8 ++++----
.../flink/table/plan/rules/FlinkBatchRuleSets.scala | 2 +-
.../logical/PushProjectIntoTableSourceScanRule.scala | 18 +++++++++++++++---
...ceRule.scala => BatchExecTableSourceScanRule.scala} | 6 +++---
.../flink/table/plan/schema/TableSourceTable.scala | 9 +++++++--
.../table/runtime/batch/sql/TableSourceITCase.scala | 2 --
.../org/apache/flink/table/util/testTableSources.scala | 10 ++--------
8 files changed, 35 insertions(+), 26 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index c6d442c..75f154a 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -102,7 +102,7 @@ class BatchExecTableSourceScan(
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = false,
- None)
+ tableSourceTable.selectedFields)
val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType)
val producedDataType = tableSource.getProducedDataType
@@ -118,7 +118,7 @@ class BatchExecTableSourceScan(
// get expression to extract rowtime attribute
val rowtimeExpression: Option[RexNode] =
TableSourceUtil.getRowtimeExtractionExpression(
tableSource,
- None,
+ tableSourceTable.selectedFields,
cluster,
planner.getRelBuilder
)
@@ -144,7 +144,7 @@ class BatchExecTableSourceScan(
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = false,
- None)
+ tableSourceTable.selectedFields)
ScanUtil.hasTimeAttributeField(fieldIndexes) ||
ScanUtil.needsConversion(
tableSource.getProducedDataType,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index bed598a..a67f4c1 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -114,7 +114,7 @@ class StreamExecTableSourceScan(
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = true,
- None)
+ tableSourceTable.selectedFields)
val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType)
val producedDataType = tableSource.getProducedDataType
@@ -130,7 +130,7 @@ class StreamExecTableSourceScan(
// get expression to extract rowtime attribute
val rowtimeExpression: Option[RexNode] =
TableSourceUtil.getRowtimeExtractionExpression(
tableSource,
- None,
+ tableSourceTable.selectedFields,
cluster,
planner.getRelBuilder
)
@@ -166,7 +166,7 @@ class StreamExecTableSourceScan(
// generate watermarks for rowtime indicator
val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
- TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, None)
+ TableSourceUtil.getRowtimeAttributeDescriptor(tableSource,
tableSourceTable.selectedFields)
val withWatermarks = if (rowtimeDesc.isDefined) {
val rowtimeFieldIdx =
getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
@@ -195,7 +195,7 @@ class StreamExecTableSourceScan(
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
isStreamTable = true,
- None)
+ tableSourceTable.selectedFields)
ScanUtil.hasTimeAttributeField(fieldIndexes) ||
ScanUtil.needsConversion(
tableSource.getProducedDataType,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 6538ca6..d353840 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -362,7 +362,7 @@ object FlinkBatchRuleSets {
FlinkExpandConversionRule.BATCH_INSTANCE,
// source
BatchExecBoundedStreamScanRule.INSTANCE,
- BatchExecScanTableSourceRule.INSTANCE,
+ BatchExecTableSourceScanRule.INSTANCE,
BatchExecIntermediateTableScanRule.INSTANCE,
BatchExecValuesRule.INSTANCE,
// calc
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 7518844..3f3395c 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -22,11 +22,11 @@ import
org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
import org.apache.flink.table.plan.util._
import org.apache.flink.table.sources._
import org.apache.flink.util.CollectionUtil
-
import org.apache.calcite.plan.RelOptRule.{none, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
import org.apache.calcite.rel.rules.ProjectRemoveRule
+import org.apache.flink.table.api.TableException
/**
* Planner rule that pushes a [[LogicalProject]] into a [[LogicalTableScan]]
@@ -66,7 +66,8 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
- val newTableSource = tableSourceTable.tableSource match {
+ val oldTableSource = tableSourceTable.tableSource
+ val newTableSource = oldTableSource match {
case nested: NestedFieldsProjectableTableSource[_] =>
val nestedFields = RexNodeExtractor.extractRefNestedInputFields(
project.getProjects, usedFields)
@@ -74,9 +75,20 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
case projecting: ProjectableTableSource[_] =>
projecting.projectFields(usedFields)
}
+
+ // check that table schema of the new table source is identical to original
+ if (oldTableSource.getTableSchema != newTableSource.getTableSchema) {
+ throw new TableException("TableSchema of ProjectableTableSource must not
be modified " +
+ "by projectFields() call. This is a bug in the implementation of the
TableSource " +
+ s"${oldTableSource.getClass.getCanonicalName}.")
+ }
+
// project push down does not change the statistic, we can reuse origin
statistic
val newTableSourceTable = new TableSourceTable(
- newTableSource, tableSourceTable.isStreamingMode,
tableSourceTable.statistic)
+ newTableSource,
+ tableSourceTable.isStreamingMode,
+ tableSourceTable.statistic,
+ Option(usedFields))
// row type is changed after project push down
val newRowType =
newTableSourceTable.getRowType(scan.getCluster.getTypeFactory)
val newRelOptTable = relOptTable.copy(newTableSourceTable, newRowType)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala
similarity index 94%
rename from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
rename to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala
index 0d702db..415b963 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.sources.StreamTableSource
/**
* Rule that converts [[FlinkLogicalTableSourceScan]] to
[[BatchExecTableSourceScan]].
*/
-class BatchExecScanTableSourceRule
+class BatchExecTableSourceScanRule
extends ConverterRule(
classOf[FlinkLogicalTableSourceScan],
FlinkConventions.LOGICAL,
@@ -63,6 +63,6 @@ class BatchExecScanTableSourceRule
}
}
-object BatchExecScanTableSourceRule {
- val INSTANCE: RelOptRule = new BatchExecScanTableSourceRule
+object BatchExecTableSourceScanRule {
+ val INSTANCE: RelOptRule = new BatchExecTableSourceScanRule
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index 16da7c3..1807c05 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -34,16 +34,21 @@ import org.apache.flink.table.sources.{TableSource,
TableSourceUtil}
class TableSourceTable[T](
val tableSource: TableSource[T],
val isStreamingMode: Boolean,
- val statistic: FlinkStatistic)
+ val statistic: FlinkStatistic,
+ val selectedFields: Option[Array[Int]])
extends FlinkTable {
+ def this(tableSource: TableSource[T], isStreamingMode: Boolean, statistic:
FlinkStatistic) {
+ this(tableSource, isStreamingMode, statistic, None)
+ }
+
// TODO implements this
// TableSourceUtil.validateTableSource(tableSource)
override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
TableSourceUtil.getRelDataType(
tableSource,
- None,
+ selectedFields,
streaming = isStreamingMode,
typeFactory.asInstanceOf[FlinkTypeFactory])
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 48a0621..fcda2f9 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -179,8 +179,6 @@ class TableSourceITCase extends BatchTestBase {
)
}
- @Ignore("[FLINK-13075] Project pushdown rule shouldn't require" +
- " the TableSource return a modified schema in blink planner")
@Test
def testLookupJoinCsvTemporalTable(): Unit = {
val orders = TestTableSources.getOrdersCsvTableSource
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index fa73905..93df633 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -244,9 +244,6 @@ class TestProjectableTableSource(
projectedTypes.asInstanceOf[Array[TypeInformation[_]]],
projectedNames)
- val projectedDataTypes = fields.map(tableSchema.getFieldDataTypes.apply(_))
- val newTableSchema = TableSchema.builder().fields(projectedNames,
projectedDataTypes).build()
-
val projectedValues = values.map { fromRow =>
val pRow = new Row(fields.length)
fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to,
fromRow.getField(from)) }
@@ -255,7 +252,7 @@ class TestProjectableTableSource(
new TestProjectableTableSource(
isBounded,
- newTableSchema,
+ tableSchema,
projectedReturnType,
projectedValues,
rowtime,
@@ -304,9 +301,6 @@ class TestNestedProjectableTableSource(
val newReadNestedFields = projectedNames.zip(nestedFields)
.flatMap(f => f._2.map(n => s"${f._1}.$n"))
- val projectedDataTypes = fields.map(tableSchema.getFieldDataTypes.apply(_))
- val newTableSchema = TableSchema.builder().fields(projectedNames,
projectedDataTypes).build()
-
val projectedValues = values.map { fromRow =>
val pRow = new Row(fields.length)
fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to,
fromRow.getField(from)) }
@@ -315,7 +309,7 @@ class TestNestedProjectableTableSource(
val copy = new TestNestedProjectableTableSource(
isBounded,
- newTableSchema,
+ tableSchema,
projectedReturnType,
projectedValues,
rowtime,