This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new e1e62e3  [FLINK-13075][table-planner-blink] Project pushdown rule 
shouldn't require the TableSource return a modified schema in blink planner
e1e62e3 is described below

commit e1e62e3629a8b5055dbab70709b9790cda33fa30
Author: TsReaper <tsreape...@gmail.com>
AuthorDate: Mon Jul 22 17:47:40 2019 +0800

    [FLINK-13075][table-planner-blink] Project pushdown rule shouldn't require 
the TableSource return a modified schema in blink planner
    
    This closes #9197
---
 .../physical/batch/BatchExecTableSourceScan.scala      |  6 +++---
 .../physical/stream/StreamExecTableSourceScan.scala    |  8 ++++----
 .../flink/table/plan/rules/FlinkBatchRuleSets.scala    |  2 +-
 .../logical/PushProjectIntoTableSourceScanRule.scala   | 18 +++++++++++++++---
 ...ceRule.scala => BatchExecTableSourceScanRule.scala} |  6 +++---
 .../flink/table/plan/schema/TableSourceTable.scala     |  9 +++++++--
 .../table/runtime/batch/sql/TableSourceITCase.scala    |  2 --
 .../org/apache/flink/table/util/testTableSources.scala | 10 ++--------
 8 files changed, 35 insertions(+), 26 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index c6d442c..75f154a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -102,7 +102,7 @@ class BatchExecTableSourceScan(
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
       isStreamTable = false,
-      None)
+      tableSourceTable.selectedFields)
 
     val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType)
     val producedDataType = tableSource.getProducedDataType
@@ -118,7 +118,7 @@ class BatchExecTableSourceScan(
     // get expression to extract rowtime attribute
     val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
       tableSource,
-      None,
+      tableSourceTable.selectedFields,
       cluster,
       planner.getRelBuilder
     )
@@ -144,7 +144,7 @@ class BatchExecTableSourceScan(
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
       isStreamTable = false,
-      None)
+      tableSourceTable.selectedFields)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
       ScanUtil.needsConversion(
         tableSource.getProducedDataType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index bed598a..a67f4c1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -114,7 +114,7 @@ class StreamExecTableSourceScan(
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
       isStreamTable = true,
-      None)
+      tableSourceTable.selectedFields)
 
     val inputDataType = fromLegacyInfoToDataType(inputTransform.getOutputType)
     val producedDataType = tableSource.getProducedDataType
@@ -130,7 +130,7 @@ class StreamExecTableSourceScan(
     // get expression to extract rowtime attribute
     val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
       tableSource,
-      None,
+      tableSourceTable.selectedFields,
       cluster,
       planner.getRelBuilder
     )
@@ -166,7 +166,7 @@ class StreamExecTableSourceScan(
 
     // generate watermarks for rowtime indicator
     val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
-      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, None)
+      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
tableSourceTable.selectedFields)
 
     val withWatermarks = if (rowtimeDesc.isDefined) {
       val rowtimeFieldIdx = 
getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
@@ -195,7 +195,7 @@ class StreamExecTableSourceScan(
     val fieldIndexes = TableSourceUtil.computeIndexMapping(
       tableSource,
       isStreamTable = true,
-      None)
+      tableSourceTable.selectedFields)
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
       ScanUtil.needsConversion(
         tableSource.getProducedDataType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index 6538ca6..d353840 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -362,7 +362,7 @@ object FlinkBatchRuleSets {
     FlinkExpandConversionRule.BATCH_INSTANCE,
     // source
     BatchExecBoundedStreamScanRule.INSTANCE,
-    BatchExecScanTableSourceRule.INSTANCE,
+    BatchExecTableSourceScanRule.INSTANCE,
     BatchExecIntermediateTableScanRule.INSTANCE,
     BatchExecValuesRule.INSTANCE,
     // calc
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 7518844..3f3395c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -22,11 +22,11 @@ import 
org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable}
 import org.apache.flink.table.plan.util._
 import org.apache.flink.table.sources._
 import org.apache.flink.util.CollectionUtil
-
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
 import org.apache.calcite.rel.rules.ProjectRemoveRule
+import org.apache.flink.table.api.TableException
 
 /**
   * Planner rule that pushes a [[LogicalProject]] into a [[LogicalTableScan]]
@@ -66,7 +66,8 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
 
     val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
     val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
-    val newTableSource = tableSourceTable.tableSource match {
+    val oldTableSource = tableSourceTable.tableSource
+    val newTableSource = oldTableSource match {
       case nested: NestedFieldsProjectableTableSource[_] =>
         val nestedFields = RexNodeExtractor.extractRefNestedInputFields(
           project.getProjects, usedFields)
@@ -74,9 +75,20 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
       case projecting: ProjectableTableSource[_] =>
         projecting.projectFields(usedFields)
     }
+
+    // check that table schema of the new table source is identical to original
+    if (oldTableSource.getTableSchema != newTableSource.getTableSchema) {
+      throw new TableException("TableSchema of ProjectableTableSource must not 
be modified " +
+        "by projectFields() call. This is a bug in the implementation of the 
TableSource " +
+        s"${oldTableSource.getClass.getCanonicalName}.")
+    }
+
     // project push down does not change the statistic, we can reuse origin 
statistic
     val newTableSourceTable = new TableSourceTable(
-      newTableSource, tableSourceTable.isStreamingMode, 
tableSourceTable.statistic)
+      newTableSource,
+      tableSourceTable.isStreamingMode,
+      tableSourceTable.statistic,
+      Option(usedFields))
     // row type is changed after project push down
     val newRowType = 
newTableSourceTable.getRowType(scan.getCluster.getTypeFactory)
     val newRelOptTable = relOptTable.copy(newTableSourceTable, newRowType)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala
similarity index 94%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala
index 0d702db..415b963 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecTableSourceScanRule.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.sources.StreamTableSource
 /**
   * Rule that converts [[FlinkLogicalTableSourceScan]] to 
[[BatchExecTableSourceScan]].
   */
-class BatchExecScanTableSourceRule
+class BatchExecTableSourceScanRule
   extends ConverterRule(
     classOf[FlinkLogicalTableSourceScan],
     FlinkConventions.LOGICAL,
@@ -63,6 +63,6 @@ class BatchExecScanTableSourceRule
   }
 }
 
-object BatchExecScanTableSourceRule {
-  val INSTANCE: RelOptRule = new BatchExecScanTableSourceRule
+object BatchExecTableSourceScanRule {
+  val INSTANCE: RelOptRule = new BatchExecTableSourceScanRule
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index 16da7c3..1807c05 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -34,16 +34,21 @@ import org.apache.flink.table.sources.{TableSource, 
TableSourceUtil}
 class TableSourceTable[T](
     val tableSource: TableSource[T],
     val isStreamingMode: Boolean,
-    val statistic: FlinkStatistic)
+    val statistic: FlinkStatistic,
+    val selectedFields: Option[Array[Int]])
   extends FlinkTable {
 
+  def this(tableSource: TableSource[T], isStreamingMode: Boolean, statistic: 
FlinkStatistic) {
+    this(tableSource, isStreamingMode, statistic, None)
+  }
+
   // TODO implements this
   // TableSourceUtil.validateTableSource(tableSource)
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
     TableSourceUtil.getRelDataType(
       tableSource,
-      None,
+      selectedFields,
       streaming = isStreamingMode,
       typeFactory.asInstanceOf[FlinkTypeFactory])
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 48a0621..fcda2f9 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -179,8 +179,6 @@ class TableSourceITCase extends BatchTestBase {
     )
   }
 
-  @Ignore("[FLINK-13075] Project pushdown rule shouldn't require" +
-    " the TableSource return a modified schema in blink planner")
   @Test
   def testLookupJoinCsvTemporalTable(): Unit = {
     val orders = TestTableSources.getOrdersCsvTableSource
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index fa73905..93df633 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -244,9 +244,6 @@ class TestProjectableTableSource(
       projectedTypes.asInstanceOf[Array[TypeInformation[_]]],
       projectedNames)
 
-    val projectedDataTypes = fields.map(tableSchema.getFieldDataTypes.apply(_))
-    val newTableSchema = TableSchema.builder().fields(projectedNames, 
projectedDataTypes).build()
-
     val projectedValues = values.map { fromRow =>
       val pRow = new Row(fields.length)
       fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to, 
fromRow.getField(from)) }
@@ -255,7 +252,7 @@ class TestProjectableTableSource(
 
     new TestProjectableTableSource(
       isBounded,
-      newTableSchema,
+      tableSchema,
       projectedReturnType,
       projectedValues,
       rowtime,
@@ -304,9 +301,6 @@ class TestNestedProjectableTableSource(
     val newReadNestedFields = projectedNames.zip(nestedFields)
       .flatMap(f => f._2.map(n => s"${f._1}.$n"))
 
-    val projectedDataTypes = fields.map(tableSchema.getFieldDataTypes.apply(_))
-    val newTableSchema = TableSchema.builder().fields(projectedNames, 
projectedDataTypes).build()
-
     val projectedValues = values.map { fromRow =>
       val pRow = new Row(fields.length)
       fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to, 
fromRow.getField(from)) }
@@ -315,7 +309,7 @@ class TestNestedProjectableTableSource(
 
     val copy = new TestNestedProjectableTableSource(
       isBounded,
-      newTableSchema,
+      tableSchema,
       projectedReturnType,
       projectedValues,
       rowtime,

Reply via email to