This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69e7358063cb2225a1229d5869eae54d1a3905ad
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Dec 13 09:49:33 2019 +0100

    [FLINK-15168][table-planner-blink] Compute physical indices based on new 
type hierarchy instead of TypeInformation
---
 .../table/planner/calcite/FlinkTypeFactory.scala   |  13 +
 .../physical/batch/BatchExecTableSourceScan.scala  |  50 +++-
 .../stream/StreamExecTableSourceScan.scala         |  52 ++--
 .../table/planner/sources/TableSourceUtil.scala    | 314 ++++++---------------
 4 files changed, 166 insertions(+), 263 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 5a5db1f..f092398 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.api.{DataTypes, TableException, TableSchema}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
 import org.apache.flink.table.planner.plan.schema.{GenericRelDataType, _}
+import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter
+import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical._
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.types.Nothing
@@ -485,6 +487,17 @@ object FlinkTypeFactory {
     logicalType.copy(relDataType.isNullable)
   }
 
+  def toTableSchema(relDataType: RelDataType): TableSchema = {
+    val fieldNames = relDataType.getFieldNames.toArray(new Array[String](0))
+    val fieldTypes = relDataType.getFieldList
+      .asScala
+      .map(field =>
+        LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+          FlinkTypeFactory.toLogicalType(field.getType))
+      ).toArray
+    TableSchema.builder.fields(fieldNames, fieldTypes).build
+  }
+
   def toLogicalRowType(relType: RelDataType): RowType = {
     checkArgument(relType.isStruct)
     RowType.of(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 61080dc..38387c4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -37,16 +37,19 @@ import 
org.apache.flink.table.planner.plan.schema.TableSourceTable
 import org.apache.flink.table.planner.plan.utils.ScanUtil
 import org.apache.flink.table.planner.sources.TableSourceUtil
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
-import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.sources.{DefinedFieldMapping, StreamTableSource}
+import org.apache.flink.table.utils.TypeMappingUtils
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rex.RexNode
 
+import java.util.function.{Function => JFunction}
 import java.{lang, util}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
   * Batch physical RelNode to read data from an external source defined by a
@@ -92,11 +95,7 @@ class BatchExecTableSourceScan(
     val config = planner.getTableConfig
     val inputTransform = getSourceTransformation(planner.getExecEnv)
 
-    val rowType = 
FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)
-    val fieldIndexes = TableSourceUtil.computeIndexMapping(
-      tableSource,
-      rowType,
-      tableSourceTable.isStreamingMode)
+    val fieldIndexes = computeIndexMapping()
 
     val inputDataType = inputTransform.getOutputType
     val producedDataType = tableSource.getProducedDataType
@@ -110,19 +109,25 @@ class BatchExecTableSourceScan(
     }
 
     // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeAttributeDescriptor(
       tableSource,
-      tableSourceTable.getRowType,
-      cluster,
-      planner.getRelBuilder
+      tableSourceTable.getRowType
+    ).map(desc =>
+      TableSourceUtil.getRowtimeExtractionExpression(
+        desc.getTimestampExtractor,
+        producedDataType,
+        planner.getRelBuilder,
+        nameMapping
+      )
     )
+
     if (needInternalConversion) {
       // the produced type may not carry the correct precision user defined in 
DDL, because
       // it may be converted from legacy type. Fix precision using logical 
schema from DDL.
       // code generation requires the correct precision of input fields.
       val fixedProducedDataType = 
TableSourceUtil.fixPrecisionForProducedDataType(
         tableSource,
-        rowType)
+        FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType))
       ScanUtil.convertToInternalRow(
         CodeGeneratorContext(config),
         inputTransform.asInstanceOf[Transformation[Any]],
@@ -139,10 +144,7 @@ class BatchExecTableSourceScan(
   }
 
   def needInternalConversion: Boolean = {
-    val fieldIndexes = TableSourceUtil.computeIndexMapping(
-      tableSource,
-      FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType),
-      tableSourceTable.isStreamingMode)
+    val fieldIndexes = computeIndexMapping()
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
       ScanUtil.needsConversion(tableSource.getProducedDataType)
   }
@@ -163,4 +165,22 @@ class BatchExecTableSourceScan(
     ExecNode.setManagedMemoryWeight(env.addSource(func, 
tableSource.explainSource(), t)
         .getTransformation)
   }
+
+  private def computeIndexMapping()
+    : Array[Int] = {
+    TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+      tableSource,
+      FlinkTypeFactory.toTableSchema(getRowType).getTableColumns,
+      false,
+      nameMapping
+    )
+  }
+
+  private lazy val nameMapping: JFunction[String, String] = tableSource match {
+    case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+      new JFunction[String, String] {
+        override def apply(t: String): String = mapping.getFieldMapping.get(t)
+      }
+    case _ => JFunction.identity()
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index dbf5bfd..fa9d505 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -41,8 +41,9 @@ import org.apache.flink.table.planner.sources.TableSourceUtil
 import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter
 import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, 
PreserveWatermarks, PunctuatedWatermarkAssigner}
-import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, 
StreamTableSource}
+import org.apache.flink.table.sources.{DefinedFieldMapping, 
RowtimeAttributeDescriptor, StreamTableSource}
 import org.apache.flink.table.types.{DataType, FieldsDataType}
+import org.apache.flink.table.utils.TypeMappingUtils
 import org.apache.flink.types.Row
 
 import org.apache.calcite.plan._
@@ -51,6 +52,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rex.RexNode
 
 import java.util
+import java.util.function.{Function => JFunction}
 
 import scala.collection.JavaConversions._
 
@@ -102,17 +104,13 @@ class StreamExecTableSourceScan(
     val config = planner.getTableConfig
     val inputTransform = getSourceTransformation(planner.getExecEnv)
 
-    val rowType = 
FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)
-    val fieldIndexes = TableSourceUtil.computeIndexMapping(
-      tableSource,
-      rowType,
-      tableSourceTable.isStreamingMode)
+    val fieldIndexes = computeIndexMapping()
 
-    val inputDataType = inputTransform.getOutputType
     val producedDataType = tableSource.getProducedDataType
 
     // check that declared and actual type of table source DataStream are 
identical
-    if (inputDataType != 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
+    if (inputTransform.getOutputType !=
+        TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
       throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
         s"returned a DataStream of data type $producedDataType that does not 
match with the " +
         s"data type $producedDataType declared by the 
TableSource.getProducedDataType() method. " +
@@ -120,11 +118,16 @@ class StreamExecTableSourceScan(
     }
 
     // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeAttributeDescriptor(
       tableSource,
-      tableSourceTable.getRowType,
-      cluster,
-      planner.getRelBuilder
+      tableSourceTable.getRowType
+    ).map(desc =>
+      TableSourceUtil.getRowtimeExtractionExpression(
+        desc.getTimestampExtractor,
+        producedDataType,
+        planner.getRelBuilder,
+        nameMapping
+      )
     )
 
     val streamTransformation = if (needInternalConversion) {
@@ -142,7 +145,7 @@ class StreamExecTableSourceScan(
       // Code generation requires the correct precision of input fields.
       val fixedProducedDataType = 
TableSourceUtil.fixPrecisionForProducedDataType(
         tableSource,
-        rowType)
+        FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType))
       val conversionTransform = ScanUtil.convertToInternalRow(
         ctx,
         inputTransform.asInstanceOf[Transformation[Any]],
@@ -189,10 +192,7 @@ class StreamExecTableSourceScan(
   }
 
   private def needInternalConversion: Boolean = {
-    val fieldIndexes = TableSourceUtil.computeIndexMapping(
-      tableSource,
-      FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType),
-      tableSourceTable.isStreamingMode)
+    val fieldIndexes = computeIndexMapping()
     ScanUtil.hasTimeAttributeField(fieldIndexes) ||
       ScanUtil.needsConversion(tableSource.getProducedDataType)
   }
@@ -205,6 +205,24 @@ class StreamExecTableSourceScan(
     // The disadvantage is that streaming not support multi-paths.
     env.createInput(format, 
t).name(tableSource.explainSource()).getTransformation
   }
+
+  private def computeIndexMapping()
+    : Array[Int] = {
+    TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+      tableSource,
+      FlinkTypeFactory.toTableSchema(getRowType).getTableColumns,
+      true,
+      nameMapping
+    )
+  }
+
+  private lazy val nameMapping: JFunction[String, String] = tableSource match {
+    case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+      new JFunction[String, String] {
+        override def apply(t: String): String = mapping.getFieldMapping.get(t)
+      }
+    case _ => JFunction.identity()
+  }
 }
 
 /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 245a4b3..9f34ae1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -20,120 +20,49 @@ package org.apache.flink.table.planner.sources
 
 import org.apache.flink.table.api.{DataTypes, TableSchema, 
ValidationException, WatermarkSpec}
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
valueLiteral}
-import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, 
ResolvedFieldReference}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ResolvedExpression, ResolvedFieldReference}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
-import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable
-import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
 import org.apache.flink.table.runtime.types.DataTypePrecisionFixer
-import org.apache.flink.table.sources.{DefinedFieldMapping, 
DefinedProctimeAttribute, DefinedRowtimeAttributes, RowtimeAttributeDescriptor, 
TableSource}
+import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.sources._
+import org.apache.flink.table.sources.tsextractors.{TimestampExtractor, 
TimestampExtractorUtils}
 import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, 
RowType, TimestampKind, TimestampType, TinyIntType}
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-import com.google.common.collect.ImmutableList
+import org.apache.flink.table.types.logical.RowType.RowField
+import org.apache.flink.table.types.logical._
+
 import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.table.types.logical.RowType.RowField
 
-import java.sql.Timestamp
+import _root_.java.sql.Timestamp
+import _root_.java.util.function.{Function => JFunction}
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
+import _root_.scala.collection.JavaConverters._
 
 /** Util class for [[TableSource]]. */
 object TableSourceUtil {
 
   /**
-    * Computes the indices that map the input type of the DataStream to the 
schema of the table.
-    *
-    * The mapping is based on the field names and fails if a table field 
cannot be
-    * mapped to a field of the input type.
-    *
-    * @param tableSource The table source for which the table schema is mapped 
to the input type.
-    * @param rowType Table source table row type
-    * @param isStreamTable If this table source is in streaming mode
-    * @return An index mapping from input type to table schema.
-    */
-  def computeIndexMapping(
-      tableSource: TableSource[_],
-      rowType: RowType,
-      isStreamTable: Boolean): Array[Int] = {
-
-    // get rowtime and proctime attributes
-    val rowtimeAttributes = getRowtimeAttributes(tableSource)
-    val proctimeAttributes = getProctimeAttribute(tableSource)
-    // compute mapping of selected fields and time attributes
-    val names = rowType.getFieldNames.toArray
-    val fieldTypes = rowType
-      .getFields
-      .map(_.getType)
-      .toArray
-    val mapping: Array[Int] = fieldTypes.zip(names).map {
-      case (_: TimestampType, name: String)
-        if proctimeAttributes.contains(name) =>
-        if (isStreamTable) {
-          TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER
-        } else {
-          TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER
-        }
-      case (_: TimestampType, name: String)
-        if rowtimeAttributes.contains(name) =>
-        if (isStreamTable) {
-          TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER
-        } else {
-          TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER
-        }
-      case (t: LogicalType, name: String) =>
-        // check if field is registered as time indicator
-        if (proctimeAttributes.contains(name)) {
-          throw new ValidationException(s"Processing time field '$name' has 
invalid type $t. " +
-            s"Processing time attributes must be of TimestampType.")
-        }
-        if (rowtimeAttributes.contains(name)) {
-          throw new ValidationException(s"Rowtime field '$name' has invalid 
type $t. " +
-            s"Rowtime attributes must be of TimestampType.")
-        }
-        val (physicalName, idx, logicalType) = resolveInputField(name, 
tableSource)
-        // validate that mapped fields are are same type
-        if (!isAssignable(logicalType, t)) {
-          throw new ValidationException(s"Type $t of table field '$name' does 
not " +
-            s"match with type $logicalType of the field '$physicalName' of the 
" +
-            "TableSource return type.")
-        }
-        idx
-    }
-
-    val inputType = fromDataTypeToLogicalType(tableSource.getProducedDataType)
-
-    // ensure that only one field is mapped to an atomic type
-    if (!(inputType.getTypeRoot == LogicalTypeRoot.ROW) && mapping.count(_ >= 
0) > 1) {
-      throw new ValidationException(
-        s"More than one table field matched to atomic input type $inputType.")
-    }
-
-    mapping
-  }
-
-
-  /**
-    * Fixes the precision of [[TableSource#getProducedDataType()]] with the 
given logical schema
-    * type. The precision of producedDataType may lose, because the data type 
may comes from
-    * legacy type (e.g. Types.BIG_DEC). However, the precision is important to 
convert output of
-    * source to internal row.
-    *
-    * @param tableSource the table source
-    * @param logicalSchema the logical schema from DDL which carries the 
correct precisions
-    * @return the produced data type with correct precisions.
-    */
+   * Fixes the precision of [[TableSource#getProducedDataType()]] with the 
given logical schema
+   * type. The precision of producedDataType may lose, because the data type 
may comes from
+   * legacy type (e.g. Types.BIG_DEC). However, the precision is important to 
convert output of
+   * source to internal row.
+   *
+   * @param tableSource the table source
+   * @param logicalSchema the logical schema from DDL which carries the 
correct precisions
+   * @return the produced data type with correct precisions.
+   */
   def fixPrecisionForProducedDataType(
       tableSource: TableSource[_],
-      logicalSchema: RowType): DataType = {
+      logicalSchema: RowType)
+    : DataType = {
 
     // remove proctime field from logical schema, because proctime is not in 
produced data type
     val schemaWithoutProctime = getProctimeAttribute(tableSource) match {
@@ -344,76 +273,70 @@ object TableSourceUtil {
   }
 
   /**
-    * Obtains the [[RexNode]] expression to extract the rowtime timestamp for 
a [[TableSource]].
-    *
-    * @param tableSource The [[TableSource]] for which the expression is 
extracted.
-    * @param rowType The table source table row type
-    * @param cluster The [[RelOptCluster]] of the current optimization process.
-    * @param relBuilder The [[RelBuilder]] to build the [[RexNode]].
-    * @return The [[RexNode]] expression to extract the timestamp of the table 
source.
-    */
+   * Retrieves an expression to compute a rowtime attribute.
+   *
+   * @param extractor Timestamp extractor to construct an expression for.
+   * @param physicalInputType Physical input type that the timestamp extractor 
accesses.
+   * @param relBuilder  Builder needed to construct the resulting RexNode.
+   * @param nameMapping Additional remapping of a logical to a physical field 
name.
+   *                    TimestampExtractor works with logical names, but 
accesses physical
+   *                    fields
+   * @return The [[RexNode]] expression to extract the timestamp.
+   */
   def getRowtimeExtractionExpression(
-      tableSource: TableSource[_],
-      rowType: RelDataType,
-      cluster: RelOptCluster,
-      relBuilder: RelBuilder): Option[RexNode] = {
-
-    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      extractor: TimestampExtractor,
+      physicalInputType: DataType,
+      relBuilder: RelBuilder,
+      nameMapping: JFunction[String, String])
+    : RexNode = {
+    val accessedFields = TimestampExtractorUtils.getAccessedFields(
+      extractor,
+      physicalInputType,
+      nameMapping)
+
+    relBuilder.push(createSchemaRelNode(accessedFields, relBuilder.getCluster))
+    val expr = constructExpression(
+      extractor,
+      accessedFields
+    ).accept(new ExpressionConverter(relBuilder))
+    relBuilder.clear()
+    expr
+  }
 
-    /**
-      * Creates a RelNode with a schema that corresponds on the given fields
-      * Fields for which no information is available, will have default values.
-      */
-    def createSchemaRelNode(fields: Array[(String, Int, LogicalType)]): 
RelNode = {
-      val maxIdx = fields.map(_._2).max
-      val idxMap: Map[Int, (String, LogicalType)] = Map(
-        fields.map(f => f._2 ->(f._1, f._3)): _*)
-      val (physicalFields, physicalTypes) = (0 to maxIdx)
-        .map(i => idxMap.getOrElse(i, ("", new TinyIntType()))).unzip
-      val physicalSchema: RelDataType = typeFactory.buildRelNodeRowType(
+  private def createSchemaRelNode(
+      fields: Array[ResolvedFieldReference],
+      cluster: RelOptCluster)
+    : RelNode = {
+    val maxIdx = fields.map(_.fieldIndex()).max
+    val idxMap: Map[Int, (String, LogicalType)] = Map(
+      fields.map(f => f.fieldIndex() -> (f.name(), 
fromTypeInfoToLogicalType(f.resultType()))): _*)
+    val (physicalFields, physicalTypes) = (0 to maxIdx)
+      .map(i => idxMap.getOrElse(i, ("", new TinyIntType()))).unzip
+    val physicalSchema: RelDataType = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      .buildRelNodeRowType(
         physicalFields,
         physicalTypes)
-      LogicalValues.create(
-        cluster,
-        physicalSchema,
-        
ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]])
-    }
-
-    val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, rowType)
-    rowtimeDesc.map { r =>
-      val tsExtractor = r.getTimestampExtractor
-
-      val fieldAccesses: Array[ResolvedFieldReference] =
-        if (tsExtractor.getArgumentFields.nonEmpty) {
-          val resolvedFields = 
resolveInputFields(tsExtractor.getArgumentFields, tableSource)
-          // push an empty values node with the physical schema on the 
relbuilder
-          relBuilder.push(createSchemaRelNode(resolvedFields))
-          // get extraction expression
-          resolvedFields.map(
-            f => new ResolvedFieldReference(
-              f._1,
-              fromLogicalTypeToTypeInfo(f._3),
-              f._2))
-        } else {
-          new Array[ResolvedFieldReference](0)
-        }
+    LogicalValues.createEmpty(
+      cluster,
+      physicalSchema)
+  }
 
-      val expression = tsExtractor.getExpression(fieldAccesses)
-      // add cast to requested type and convert expression to RexNode
-      // blink runner treats numeric types as seconds in the cast of timestamp 
and numerical types.
-      // So we use REINTERPRET_CAST to keep the mills of numeric types.
-      val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])
-      val castExpression = new CallExpression(
-        BuiltInFunctionDefinitions.REINTERPRET_CAST,
-        Seq(
-          expression.asInstanceOf[ResolvedExpression],
-          typeLiteral(outputType),
-          valueLiteral(false)),
-        outputType)
-      val rexExpression = castExpression.accept(new 
ExpressionConverter(relBuilder))
-      relBuilder.clear()
-      rexExpression
-    }
+  private def constructExpression(
+      timestampExtractor: TimestampExtractor,
+      fieldAccesses: Array[ResolvedFieldReference])
+    : Expression = {
+    val expression = timestampExtractor.getExpression(fieldAccesses)
+    // add cast to requested type and convert expression to RexNode
+    // If resultType is TimeIndicatorTypeInfo, its internal format is long, 
but cast
+    // from Timestamp is java.sql.Timestamp. So we need cast to long first.
+    val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])
+    new CallExpression(
+      BuiltInFunctionDefinitions.REINTERPRET_CAST,
+      Seq(
+        expression.asInstanceOf[ResolvedExpression],
+        typeLiteral(outputType),
+        valueLiteral(false)),
+      outputType)
   }
 
   /** Returns a list with all rowtime attribute names of the [[TableSource]]. 
*/
@@ -435,75 +358,4 @@ object TableSourceUtil {
         None
     }
   }
-
-  /**
-    * Identifies for a field name of the logical schema, the corresponding 
physical field in the
-    * return type of a [[TableSource]].
-    *
-    * @param fieldName The logical field to look up.
-    * @param tableSource The table source in which to look for the field.
-    * @return The name, index, and logical type of the physical field.
-    */
-  private def resolveInputField(
-      fieldName: String,
-      tableSource: TableSource[_]): (String, Int, LogicalType) = {
-
-    val returnType = fromDataTypeToLogicalType(tableSource.getProducedDataType)
-
-    /** Look up a field by name in a type information */
-    def lookupField(fieldName: String, failMsg: String): (String, Int, 
LogicalType) = {
-
-      returnType match {
-        case rt: RowType =>
-          // get and check field index
-          val idx = rt.getFieldIndex(fieldName)
-          if (idx < 0) {
-            throw new ValidationException(failMsg)
-          }
-
-          // return field name, index, and field type
-          (fieldName, idx, rt.getTypeAt(idx))
-        case _ =>
-          // no composite type, we return the full atomic type as field
-          (fieldName, 0, returnType)
-      }
-    }
-
-    tableSource match {
-      case d: DefinedFieldMapping if d.getFieldMapping != null =>
-        // resolve field name in field mapping
-        val resolvedFieldName = d.getFieldMapping.get(fieldName)
-        if (resolvedFieldName == null) {
-          throw new ValidationException(
-            s"Field '$fieldName' could not be resolved by the field mapping.")
-        }
-        // look up resolved field in return type
-        lookupField(
-          resolvedFieldName,
-          s"Table field '$fieldName' was resolved to TableSource return type 
field " +
-            s"'$resolvedFieldName', but field '$resolvedFieldName' was not 
found in the return " +
-            s"type $returnType of the TableSource. " +
-            s"Please verify the field mapping of the TableSource.")
-      case _ =>
-        // look up field in return type
-        lookupField(
-          fieldName,
-          s"Table field '$fieldName' was not found in the return type 
$returnType of the " +
-            s"TableSource.")
-    }
-  }
-
-  /**
-    * Identifies the physical fields in the return type of a [[TableSource]]
-    * for a list of field names of the [[TableSource]]'s 
[[org.apache.flink.table.api.TableSchema]].
-    *
-    * @param fieldNames The field names to look up.
-    * @param tableSource The table source in which to look for the field.
-    * @return The name, index, and logical type of the physical field.
-    */
-  private def resolveInputFields(
-      fieldNames: Array[String],
-      tableSource: TableSource[_]): Array[(String, Int, LogicalType)] = {
-    fieldNames.map(resolveInputField(_, tableSource))
-  }
 }

Reply via email to