[
https://issues.apache.org/jira/browse/FLINK-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020124#comment-17020124
]
Benoit Hanotte commented on FLINK-15574:
----------------------------------------
Hello [~docete], I'll close the issue as we can indeed generate the DataType &
TableSchema instead of the TypeInfo. That fixes this issue but create another
one because the type of the row time attribute doesn't seem to be compatible
between the old planner and the new one (new planner takes a sql.Timestamp or
LocaDateTime, old planner requires a TimeIndicatorTypeInfo which is serialized
as a long).
As this is another issue, we will open another ticket once we have a better
understanding of the issue.
Thanks,
Benoit
> DataType to LogicalType conversion issue
> ----------------------------------------
>
> Key: FLINK-15574
> URL: https://issues.apache.org/jira/browse/FLINK-15574
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Runtime
> Reporter: Benoit Hanotte
> Priority: Major
> Labels: pull-request-available
> Attachments: 0001-FLINK-15574-Add-unit-test-to-reproduce-issue.patch
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> We seem to be encountering an issue with the conversion from DataType to
> LogicalType with the Blink planner (full stacktrace below):
> {code}
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(BasicArrayTypeInfo<String>) of table field 'my_array' does not match
> with type BasicArrayTypeInfo<String> of the field 'my_array' of the
> TableSource return type.
> {code}
> It seems there exists 2 paths to do the conversion from DataType to
> LogicalType:
> 1. TypeConversions.fromLegacyInfoToDataType():
> used for instance when calling TableSchema.fromTypeInformation().
> 2. LogicalTypeDataTypeConverter.fromDataTypeToLogicalType():
> Deprecated but still used in TableSourceUtil and many other places.
> These 2 code paths can return a different LogicalType for the same input,
> leading to issues when the LogicalTypes are compared to ensure they are
> compatible. For instance, PlannerTypeUtils.isAssignable() returns false for
> a DataType created from BasicArrayTypeInfo (leading to the
> ValidationException above).
> The full stacktrace is the following:
> {code}
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(BasicArrayTypeInfo<String>) of table field 'my_array' does not match
> with type BasicArrayTypeInfo<String> of the field 'my_array' of the
> TableSource return type.
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion$$anonfun$2.apply(StreamExecUnion.scala:86)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion.translateToPlanInternal(StreamExecUnion.scala:85)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion.translateToPlanInternal(StreamExecUnion.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion.translateToPlan(StreamExecUnion.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:135)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlanInternal(StreamExecWindowJoin.scala:52)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecWindowJoin.translateToPlan(StreamExecWindowJoin.scala:52)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:140)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:104)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)