Yuval Itzchakov created FLINK-17600:
---------------------------------------
Summary: Blink Planner fails to generate RowtimeAttribute based on
TableSource's DefinedRowtimeAttributes implementation
Key: FLINK-17600
URL: https://issues.apache.org/jira/browse/FLINK-17600
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Yuval Itzchakov
{code:java}
// code placeholder
{code}
Given the following SQL statement:
{code:java}
tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code}
and a custom StreamTableSource[Row] which implements
`DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails
to mark the selected field with a `RowtimeAttribute`.
This happens because `TableSourceUtil.getSourceRowType`s implementation
receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably
because the Catalog has yet to create the underlying TableSource which is
deferred to implementing TableFactory (in this case my own custom one).
*This* *does not reproduce in the old Flink planner*, because the old planner
uses `TableSourceTable` which explicitly holds a reference to the underlying
`TableSource` and extracts it's row time attributes.
Relevant code:
*CatalogSchemaTable*:
{code:java}
private static RelDataType getRowType(RelDataTypeFactory typeFactory,
CatalogBaseTable catalogBaseTable,
boolean isStreamingMode) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
TableSchema tableSchema = catalogBaseTable.getSchema();
final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
if (!isStreamingMode
&& catalogBaseTable instanceof ConnectorCatalogTable
&& ((ConnectorCatalogTable)
catalogBaseTable).getTableSource().isPresent()) {
// If the table source is bounded, materialize the time attributes to
normal TIMESTAMP type.
// Now for ConnectorCatalogTable, there is no way to
// deduce if it is bounded in the table environment, so the data types in
TableSchema
// always patched with TimeAttribute.
// See ConnectorCatalogTable#calculateSourceSchema
// for details.
// Remove the patched time attributes type to let the TableSourceTable
handle it.
// We should remove this logic if the isBatch flag in
ConnectorCatalogTable is fixed.
// TODO: Fix FLINK-14844.
for (int i = 0; i < fieldDataTypes.length; i++) {
LogicalType lt = fieldDataTypes[i].getLogicalType();
if (lt instanceof TimestampType
&& (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
|| ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
int precision = ((TimestampType) lt).getPrecision();
fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
}
}
}
return TableSourceUtil.getSourceRowType(flinkTypeFactory,
tableSchema,
scala.Option.empty(),
isStreamingMode);
}
{code}
*TableSourceUtil:*
{code:java}
def getSourceRowType(
typeFactory: FlinkTypeFactory,
tableSchema: TableSchema,
tableSource: Option[TableSource[_]],
streaming: Boolean): RelDataType = {
val fieldNames = tableSchema.getFieldNames
val fieldDataTypes = tableSchema.getFieldDataTypes
if (tableSchema.getWatermarkSpecs.nonEmpty) {
getSourceRowType(typeFactory, fieldNames, fieldDataTypes,
tableSchema.getWatermarkSpecs.head,
streaming)
} else if (tableSource.isDefined) {
getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
streaming)
} else {
val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
}
}{code}
*TableSourceTable:*
{code:java}
// We must enrich logical schema from catalog table with physical type coming
from table source.
// Schema coming from catalog table might not have proper conversion classes.
Those must be
// extracted from produced type, before converting to RelDataType
def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val
flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val fieldNames
= tableSchema.getFieldNames val nameMapping: JFunction[String, String] =
tableSource match {
case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
new JFunction[String, String] {
override def apply(t: String): String = mapping.getFieldMapping.get(t)
}
case _ => JFunction.identity()
} val producedDataType = tableSource.getProducedDataType
val fieldIndexes =
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
tableSource,
tableSchema.getTableColumns,
isStreamingMode,
nameMapping
) val typeInfos = if
(LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
val physicalSchema =
DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
fieldIndexes.map(mapIndex(_,
idx =>
TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
)
} else {
fieldIndexes.map(mapIndex(_, _ =>
TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
} flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)