[
https://issues.apache.org/jira/browse/FLINK-17600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103836#comment-17103836
]
Yuval Itzchakov commented on FLINK-17600:
-----------------------------------------
Related?
[https://github.com/apache/flink/pull/11204/|https://github.com/apache/flink/pull/11204/files]
> Blink Planner fails to generate RowtimeAttribute based on TableSource's
> DefinedRowtimeAttributes implementation
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17600
> URL: https://issues.apache.org/jira/browse/FLINK-17600
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Yuval Itzchakov
> Priority: Major
>
> Given the following SQL statement:
> {code:java}
> tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code}
> Where FOO is a table originating from a custom StreamTableSource[Row] which
> implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink
> Planner fails to mark the selected field with a `RowtimeAttribute`.
> This happens because `TableSourceUtil.getSourceRowType`s implementation
> receives a `None` TableSource from `CatalogSchemaTable.getRowType`,
> presumably because the Catalog has yet to create the underlying TableSource
> which is deferred to implementing TableFactory (in this case my own custom
> one).
> *This* *does not reproduce in the old Flink planner*, because the old planner
> uses `TableSourceTable` which explicitly holds a reference to the underlying
> `TableSource` and extracts it's row time attributes.
> Relevant code:
> *CatalogSchemaTable*:
>
> {code:java}
> private static RelDataType getRowType(RelDataTypeFactory typeFactory,
> CatalogBaseTable catalogBaseTable,
> boolean isStreamingMode) {
> final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
> TableSchema tableSchema = catalogBaseTable.getSchema();
> final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
> if (!isStreamingMode
> && catalogBaseTable instanceof ConnectorCatalogTable
> && ((ConnectorCatalogTable)
> catalogBaseTable).getTableSource().isPresent()) {
> // If the table source is bounded, materialize the time attributes to
> normal TIMESTAMP type.
> // Now for ConnectorCatalogTable, there is no way to
> // deduce if it is bounded in the table environment, so the data types
> in TableSchema
> // always patched with TimeAttribute.
> // See ConnectorCatalogTable#calculateSourceSchema
> // for details.
> // Remove the patched time attributes type to let the TableSourceTable
> handle it.
> // We should remove this logic if the isBatch flag in
> ConnectorCatalogTable is fixed.
> // TODO: Fix FLINK-14844.
> for (int i = 0; i < fieldDataTypes.length; i++) {
> LogicalType lt = fieldDataTypes[i].getLogicalType();
> if (lt instanceof TimestampType
> && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
> || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
> int precision = ((TimestampType) lt).getPrecision();
> fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
> }
> }
> }
> return TableSourceUtil.getSourceRowType(flinkTypeFactory,
> tableSchema,
> scala.Option.empty(),
> isStreamingMode);
> }
> {code}
> *TableSourceUtil:*
>
>
> {code:java}
> def getSourceRowType(
> typeFactory: FlinkTypeFactory,
> tableSchema: TableSchema,
> tableSource: Option[TableSource[_]],
> streaming: Boolean): RelDataType = {
> val fieldNames = tableSchema.getFieldNames
> val fieldDataTypes = tableSchema.getFieldDataTypes
> if (tableSchema.getWatermarkSpecs.nonEmpty) {
> getSourceRowType(typeFactory, fieldNames, fieldDataTypes,
> tableSchema.getWatermarkSpecs.head,
> streaming)
> } else if (tableSource.isDefined) {
> getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
> streaming)
> } else {
> val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
> typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
> }
> }{code}
> *TableSourceTable:*
> {code:java}
> // We must enrich logical schema from catalog table with physical type
> coming from table source.
> // Schema coming from catalog table might not have proper conversion
> classes. Those must be
> // extracted from produced type, before converting to RelDataType
> def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val
> flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val
> fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String,
> String] = tableSource match {
> case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
> new JFunction[String, String] {
> override def apply(t: String): String =
> mapping.getFieldMapping.get(t)
> }
> case _ => JFunction.identity()
> } val producedDataType = tableSource.getProducedDataType
> val fieldIndexes =
> TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
> tableSource,
> tableSchema.getTableColumns,
> isStreamingMode,
> nameMapping
> ) val typeInfos = if
> (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
> val physicalSchema =
> DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
> fieldIndexes.map(mapIndex(_,
> idx =>
>
> TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
> )
> } else {
> fieldIndexes.map(mapIndex(_, _ =>
> TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
> } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)