This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69e7358063cb2225a1229d5869eae54d1a3905ad Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Dec 13 09:49:33 2019 +0100 [FLINK-15168][table-planner-blink] Compute physical indices based on new type hierarchy instead of TypeInformation --- .../table/planner/calcite/FlinkTypeFactory.scala | 13 + .../physical/batch/BatchExecTableSourceScan.scala | 50 +++- .../stream/StreamExecTableSourceScan.scala | 52 ++-- .../table/planner/sources/TableSourceUtil.scala | 314 ++++++--------------- 4 files changed, 166 insertions(+), 263 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index 5a5db1f..f092398 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -23,6 +23,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.{DataTypes, TableException, TableSchema} import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType import org.apache.flink.table.planner.plan.schema.{GenericRelDataType, _} +import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter +import org.apache.flink.table.types.DataType import org.apache.flink.table.types.logical._ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.types.Nothing @@ -485,6 +487,17 @@ object FlinkTypeFactory { logicalType.copy(relDataType.isNullable) } + def toTableSchema(relDataType: RelDataType): TableSchema = { + val fieldNames = relDataType.getFieldNames.toArray(new Array[String](0)) + val fieldTypes = relDataType.getFieldList + .asScala + .map(field => + LogicalTypeDataTypeConverter.fromLogicalTypeToDataType( + FlinkTypeFactory.toLogicalType(field.getType)) + ).toArray + TableSchema.builder.fields(fieldNames, fieldTypes).build + } + def toLogicalRowType(relType: RelDataType): RowType = { checkArgument(relType.isStruct) RowType.of( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala index 61080dc..38387c4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala @@ -37,16 +37,19 @@ import org.apache.flink.table.planner.plan.schema.TableSourceTable import org.apache.flink.table.planner.plan.utils.ScanUtil import org.apache.flink.table.planner.sources.TableSourceUtil import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter -import org.apache.flink.table.sources.StreamTableSource +import org.apache.flink.table.sources.{DefinedFieldMapping, StreamTableSource} +import org.apache.flink.table.utils.TypeMappingUtils import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rex.RexNode +import java.util.function.{Function => JFunction} import java.{lang, util} import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Batch physical RelNode to read data from an external source defined by a @@ -92,11 +95,7 @@ class BatchExecTableSourceScan( val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) - val rowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType) - val fieldIndexes = TableSourceUtil.computeIndexMapping( - tableSource, - rowType, - tableSourceTable.isStreamingMode) + val fieldIndexes = computeIndexMapping() val inputDataType = inputTransform.getOutputType val producedDataType = tableSource.getProducedDataType @@ -110,19 +109,25 @@ class BatchExecTableSourceScan( } // get expression to extract rowtime attribute - val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeAttributeDescriptor( tableSource, - tableSourceTable.getRowType, - cluster, - planner.getRelBuilder + tableSourceTable.getRowType + ).map(desc => + TableSourceUtil.getRowtimeExtractionExpression( + desc.getTimestampExtractor, + producedDataType, + planner.getRelBuilder, + nameMapping + ) ) + if (needInternalConversion) { // the produced type may not carry the correct precision user defined in DDL, because // it may be converted from legacy type. Fix precision using logical schema from DDL. // code generation requires the correct precision of input fields. val fixedProducedDataType = TableSourceUtil.fixPrecisionForProducedDataType( tableSource, - rowType) + FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)) ScanUtil.convertToInternalRow( CodeGeneratorContext(config), inputTransform.asInstanceOf[Transformation[Any]], @@ -139,10 +144,7 @@ class BatchExecTableSourceScan( } def needInternalConversion: Boolean = { - val fieldIndexes = TableSourceUtil.computeIndexMapping( - tableSource, - FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType), - tableSourceTable.isStreamingMode) + val fieldIndexes = computeIndexMapping() ScanUtil.hasTimeAttributeField(fieldIndexes) || ScanUtil.needsConversion(tableSource.getProducedDataType) } @@ -163,4 +165,22 @@ class BatchExecTableSourceScan( ExecNode.setManagedMemoryWeight(env.addSource(func, tableSource.explainSource(), t) .getTransformation) } + + private def computeIndexMapping() + : Array[Int] = { + TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( + tableSource, + FlinkTypeFactory.toTableSchema(getRowType).getTableColumns, + false, + nameMapping + ) + } + + private lazy val nameMapping: JFunction[String, String] = tableSource match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + new JFunction[String, String] { + override def apply(t: String): String = mapping.getFieldMapping.get(t) + } + case _ => JFunction.identity() + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala index dbf5bfd..fa9d505 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -41,8 +41,9 @@ import org.apache.flink.table.planner.sources.TableSourceUtil import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner} -import org.apache.flink.table.sources.{RowtimeAttributeDescriptor, StreamTableSource} +import org.apache.flink.table.sources.{DefinedFieldMapping, RowtimeAttributeDescriptor, StreamTableSource} import org.apache.flink.table.types.{DataType, FieldsDataType} +import org.apache.flink.table.utils.TypeMappingUtils import org.apache.flink.types.Row import org.apache.calcite.plan._ @@ -51,6 +52,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rex.RexNode import java.util +import java.util.function.{Function => JFunction} import scala.collection.JavaConversions._ @@ -102,17 +104,13 @@ class StreamExecTableSourceScan( val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) - val rowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType) - val fieldIndexes = TableSourceUtil.computeIndexMapping( - tableSource, - rowType, - tableSourceTable.isStreamingMode) + val fieldIndexes = computeIndexMapping() - val inputDataType = inputTransform.getOutputType val producedDataType = tableSource.getProducedDataType // check that declared and actual type of table source DataStream are identical - if (inputDataType != TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) { + if (inputTransform.getOutputType != + TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) { throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + s"returned a DataStream of data type $producedDataType that does not match with the " + s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " + @@ -120,11 +118,16 @@ class StreamExecTableSourceScan( } // get expression to extract rowtime attribute - val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeAttributeDescriptor( tableSource, - tableSourceTable.getRowType, - cluster, - planner.getRelBuilder + tableSourceTable.getRowType + ).map(desc => + TableSourceUtil.getRowtimeExtractionExpression( + desc.getTimestampExtractor, + producedDataType, + planner.getRelBuilder, + nameMapping + ) ) val streamTransformation = if (needInternalConversion) { @@ -142,7 +145,7 @@ class StreamExecTableSourceScan( // Code generation requires the correct precision of input fields. val fixedProducedDataType = TableSourceUtil.fixPrecisionForProducedDataType( tableSource, - rowType) + FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType)) val conversionTransform = ScanUtil.convertToInternalRow( ctx, inputTransform.asInstanceOf[Transformation[Any]], @@ -189,10 +192,7 @@ class StreamExecTableSourceScan( } private def needInternalConversion: Boolean = { - val fieldIndexes = TableSourceUtil.computeIndexMapping( - tableSource, - FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType), - tableSourceTable.isStreamingMode) + val fieldIndexes = computeIndexMapping() ScanUtil.hasTimeAttributeField(fieldIndexes) || ScanUtil.needsConversion(tableSource.getProducedDataType) } @@ -205,6 +205,24 @@ class StreamExecTableSourceScan( // The disadvantage is that streaming not support multi-paths. env.createInput(format, t).name(tableSource.explainSource()).getTransformation } + + private def computeIndexMapping() + : Array[Int] = { + TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( + tableSource, + FlinkTypeFactory.toTableSchema(getRowType).getTableColumns, + true, + nameMapping + ) + } + + private lazy val nameMapping: JFunction[String, String] = tableSource match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + new JFunction[String, String] { + override def apply(t: String): String = mapping.getFieldMapping.get(t) + } + case _ => JFunction.identity() + } } /** diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala index 245a4b3..9f34ae1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala @@ -20,120 +20,49 @@ package org.apache.flink.table.planner.sources import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException, WatermarkSpec} import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, valueLiteral} -import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference} +import org.apache.flink.table.expressions.{CallExpression, Expression, ResolvedExpression, ResolvedFieldReference} import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.expressions.converter.ExpressionConverter -import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable -import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType -import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo import org.apache.flink.table.runtime.types.DataTypePrecisionFixer -import org.apache.flink.table.sources.{DefinedFieldMapping, DefinedProctimeAttribute, DefinedRowtimeAttributes, RowtimeAttributeDescriptor, TableSource} +import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType +import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType +import org.apache.flink.table.sources._ +import org.apache.flink.table.sources.tsextractors.{TimestampExtractor, TimestampExtractorUtils} import org.apache.flink.table.types.DataType -import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, RowType, TimestampKind, TimestampType, TinyIntType} -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo -import com.google.common.collect.ImmutableList +import org.apache.flink.table.types.logical.RowType.RowField +import org.apache.flink.table.types.logical._ + import org.apache.calcite.plan.RelOptCluster import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.LogicalValues -import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -import org.apache.flink.table.types.logical.RowType.RowField -import java.sql.Timestamp +import _root_.java.sql.Timestamp +import _root_.java.util.function.{Function => JFunction} -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ +import _root_.scala.collection.JavaConversions._ +import _root_.scala.collection.JavaConverters._ /** Util class for [[TableSource]]. */ object TableSourceUtil { /** - * Computes the indices that map the input type of the DataStream to the schema of the table. - * - * The mapping is based on the field names and fails if a table field cannot be - * mapped to a field of the input type. - * - * @param tableSource The table source for which the table schema is mapped to the input type. - * @param rowType Table source table row type - * @param isStreamTable If this table source is in streaming mode - * @return An index mapping from input type to table schema. - */ - def computeIndexMapping( - tableSource: TableSource[_], - rowType: RowType, - isStreamTable: Boolean): Array[Int] = { - - // get rowtime and proctime attributes - val rowtimeAttributes = getRowtimeAttributes(tableSource) - val proctimeAttributes = getProctimeAttribute(tableSource) - // compute mapping of selected fields and time attributes - val names = rowType.getFieldNames.toArray - val fieldTypes = rowType - .getFields - .map(_.getType) - .toArray - val mapping: Array[Int] = fieldTypes.zip(names).map { - case (_: TimestampType, name: String) - if proctimeAttributes.contains(name) => - if (isStreamTable) { - TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER - } else { - TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER - } - case (_: TimestampType, name: String) - if rowtimeAttributes.contains(name) => - if (isStreamTable) { - TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER - } else { - TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER - } - case (t: LogicalType, name: String) => - // check if field is registered as time indicator - if (proctimeAttributes.contains(name)) { - throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + - s"Processing time attributes must be of TimestampType.") - } - if (rowtimeAttributes.contains(name)) { - throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + - s"Rowtime attributes must be of TimestampType.") - } - val (physicalName, idx, logicalType) = resolveInputField(name, tableSource) - // validate that mapped fields are are same type - if (!isAssignable(logicalType, t)) { - throw new ValidationException(s"Type $t of table field '$name' does not " + - s"match with type $logicalType of the field '$physicalName' of the " + - "TableSource return type.") - } - idx - } - - val inputType = fromDataTypeToLogicalType(tableSource.getProducedDataType) - - // ensure that only one field is mapped to an atomic type - if (!(inputType.getTypeRoot == LogicalTypeRoot.ROW) && mapping.count(_ >= 0) > 1) { - throw new ValidationException( - s"More than one table field matched to atomic input type $inputType.") - } - - mapping - } - - - /** - * Fixes the precision of [[TableSource#getProducedDataType()]] with the given logical schema - * type. The precision of producedDataType may lose, because the data type may comes from - * legacy type (e.g. Types.BIG_DEC). However, the precision is important to convert output of - * source to internal row. - * - * @param tableSource the table source - * @param logicalSchema the logical schema from DDL which carries the correct precisions - * @return the produced data type with correct precisions. - */ + * Fixes the precision of [[TableSource#getProducedDataType()]] with the given logical schema + * type. The precision of producedDataType may lose, because the data type may comes from + * legacy type (e.g. Types.BIG_DEC). However, the precision is important to convert output of + * source to internal row. + * + * @param tableSource the table source + * @param logicalSchema the logical schema from DDL which carries the correct precisions + * @return the produced data type with correct precisions. + */ def fixPrecisionForProducedDataType( tableSource: TableSource[_], - logicalSchema: RowType): DataType = { + logicalSchema: RowType) + : DataType = { // remove proctime field from logical schema, because proctime is not in produced data type val schemaWithoutProctime = getProctimeAttribute(tableSource) match { @@ -344,76 +273,70 @@ object TableSourceUtil { } /** - * Obtains the [[RexNode]] expression to extract the rowtime timestamp for a [[TableSource]]. - * - * @param tableSource The [[TableSource]] for which the expression is extracted. - * @param rowType The table source table row type - * @param cluster The [[RelOptCluster]] of the current optimization process. - * @param relBuilder The [[RelBuilder]] to build the [[RexNode]]. - * @return The [[RexNode]] expression to extract the timestamp of the table source. - */ + * Retrieves an expression to compute a rowtime attribute. + * + * @param extractor Timestamp extractor to construct an expression for. + * @param physicalInputType Physical input type that the timestamp extractor accesses. + * @param relBuilder Builder needed to construct the resulting RexNode. + * @param nameMapping Additional remapping of a logical to a physical field name. + * TimestampExtractor works with logical names, but accesses physical + * fields + * @return The [[RexNode]] expression to extract the timestamp. + */ def getRowtimeExtractionExpression( - tableSource: TableSource[_], - rowType: RelDataType, - cluster: RelOptCluster, - relBuilder: RelBuilder): Option[RexNode] = { - - val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + extractor: TimestampExtractor, + physicalInputType: DataType, + relBuilder: RelBuilder, + nameMapping: JFunction[String, String]) + : RexNode = { + val accessedFields = TimestampExtractorUtils.getAccessedFields( + extractor, + physicalInputType, + nameMapping) + + relBuilder.push(createSchemaRelNode(accessedFields, relBuilder.getCluster)) + val expr = constructExpression( + extractor, + accessedFields + ).accept(new ExpressionConverter(relBuilder)) + relBuilder.clear() + expr + } - /** - * Creates a RelNode with a schema that corresponds on the given fields - * Fields for which no information is available, will have default values. - */ - def createSchemaRelNode(fields: Array[(String, Int, LogicalType)]): RelNode = { - val maxIdx = fields.map(_._2).max - val idxMap: Map[Int, (String, LogicalType)] = Map( - fields.map(f => f._2 ->(f._1, f._3)): _*) - val (physicalFields, physicalTypes) = (0 to maxIdx) - .map(i => idxMap.getOrElse(i, ("", new TinyIntType()))).unzip - val physicalSchema: RelDataType = typeFactory.buildRelNodeRowType( + private def createSchemaRelNode( + fields: Array[ResolvedFieldReference], + cluster: RelOptCluster) + : RelNode = { + val maxIdx = fields.map(_.fieldIndex()).max + val idxMap: Map[Int, (String, LogicalType)] = Map( + fields.map(f => f.fieldIndex() -> (f.name(), fromTypeInfoToLogicalType(f.resultType()))): _*) + val (physicalFields, physicalTypes) = (0 to maxIdx) + .map(i => idxMap.getOrElse(i, ("", new TinyIntType()))).unzip + val physicalSchema: RelDataType = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + .buildRelNodeRowType( physicalFields, physicalTypes) - LogicalValues.create( - cluster, - physicalSchema, - ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]]) - } - - val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, rowType) - rowtimeDesc.map { r => - val tsExtractor = r.getTimestampExtractor - - val fieldAccesses: Array[ResolvedFieldReference] = - if (tsExtractor.getArgumentFields.nonEmpty) { - val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, tableSource) - // push an empty values node with the physical schema on the relbuilder - relBuilder.push(createSchemaRelNode(resolvedFields)) - // get extraction expression - resolvedFields.map( - f => new ResolvedFieldReference( - f._1, - fromLogicalTypeToTypeInfo(f._3), - f._2)) - } else { - new Array[ResolvedFieldReference](0) - } + LogicalValues.createEmpty( + cluster, + physicalSchema) + } - val expression = tsExtractor.getExpression(fieldAccesses) - // add cast to requested type and convert expression to RexNode - // blink runner treats numeric types as seconds in the cast of timestamp and numerical types. - // So we use REINTERPRET_CAST to keep the mills of numeric types. - val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp]) - val castExpression = new CallExpression( - BuiltInFunctionDefinitions.REINTERPRET_CAST, - Seq( - expression.asInstanceOf[ResolvedExpression], - typeLiteral(outputType), - valueLiteral(false)), - outputType) - val rexExpression = castExpression.accept(new ExpressionConverter(relBuilder)) - relBuilder.clear() - rexExpression - } + private def constructExpression( + timestampExtractor: TimestampExtractor, + fieldAccesses: Array[ResolvedFieldReference]) + : Expression = { + val expression = timestampExtractor.getExpression(fieldAccesses) + // add cast to requested type and convert expression to RexNode + // If resultType is TimeIndicatorTypeInfo, its internal format is long, but cast + // from Timestamp is java.sql.Timestamp. So we need cast to long first. + val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp]) + new CallExpression( + BuiltInFunctionDefinitions.REINTERPRET_CAST, + Seq( + expression.asInstanceOf[ResolvedExpression], + typeLiteral(outputType), + valueLiteral(false)), + outputType) } /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ @@ -435,75 +358,4 @@ object TableSourceUtil { None } } - - /** - * Identifies for a field name of the logical schema, the corresponding physical field in the - * return type of a [[TableSource]]. - * - * @param fieldName The logical field to look up. - * @param tableSource The table source in which to look for the field. - * @return The name, index, and logical type of the physical field. - */ - private def resolveInputField( - fieldName: String, - tableSource: TableSource[_]): (String, Int, LogicalType) = { - - val returnType = fromDataTypeToLogicalType(tableSource.getProducedDataType) - - /** Look up a field by name in a type information */ - def lookupField(fieldName: String, failMsg: String): (String, Int, LogicalType) = { - - returnType match { - case rt: RowType => - // get and check field index - val idx = rt.getFieldIndex(fieldName) - if (idx < 0) { - throw new ValidationException(failMsg) - } - - // return field name, index, and field type - (fieldName, idx, rt.getTypeAt(idx)) - case _ => - // no composite type, we return the full atomic type as field - (fieldName, 0, returnType) - } - } - - tableSource match { - case d: DefinedFieldMapping if d.getFieldMapping != null => - // resolve field name in field mapping - val resolvedFieldName = d.getFieldMapping.get(fieldName) - if (resolvedFieldName == null) { - throw new ValidationException( - s"Field '$fieldName' could not be resolved by the field mapping.") - } - // look up resolved field in return type - lookupField( - resolvedFieldName, - s"Table field '$fieldName' was resolved to TableSource return type field " + - s"'$resolvedFieldName', but field '$resolvedFieldName' was not found in the return " + - s"type $returnType of the TableSource. " + - s"Please verify the field mapping of the TableSource.") - case _ => - // look up field in return type - lookupField( - fieldName, - s"Table field '$fieldName' was not found in the return type $returnType of the " + - s"TableSource.") - } - } - - /** - * Identifies the physical fields in the return type of a [[TableSource]] - * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]]. - * - * @param fieldNames The field names to look up. - * @param tableSource The table source in which to look for the field. - * @return The name, index, and logical type of the physical field. - */ - private def resolveInputFields( - fieldNames: Array[String], - tableSource: TableSource[_]): Array[(String, Int, LogicalType)] = { - fieldNames.map(resolveInputField(_, tableSource)) - } }
