[
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089411#comment-17089411
]
Dawid Wysakowicz edited comment on FLINK-17313 at 4/22/20, 7:45 AM:
--------------------------------------------------------------------
I must admit I have not fully understood the change in the beginning. I was not
aware of introduction of the {{supportsAvoidingCast}} in that method before.
Having understood that now I think it's fine to add the bidirectional check.
I am still skeptical about allowing any precision for {{DECIMAL}} type for
legacy decimal type. Such conversions might require a cast.
was (Author: dawidwys):
I must admit I have not fully understood the change in the beginning. I was not
aware of introduction of the {{supportsAvoidingCast}} in that method before.
Having understood that now I think it's fine to add the bidirectional check.
I am still skeptical about allowing any precision for {{DECIMAL}} type for
legacy decimal type. Such conversions might require a cast.
Moreover
> Validation error when insert decimal/timestamp/varchar with precision into
> sink using TypeInformation of row
> ------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Terry Wang
> Priority: Major
> Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
> tEnv.sqlUpdate("create table randomSource (" +
> " a varchar(10),"
> +
> " b
> decimal(20,2)" +
> " ) with (" +
> " 'type' =
> 'random'," +
> " 'count' = '10'"
> +
> " )");
> tEnv.sqlUpdate("create table printSink (" +
> " a varchar(10),"
> +
> " b
> decimal(22,2)," +
> " c
> timestamp(3)," +
> " ) with (" +
> " 'type' = 'print'" +
> " )");
> tEnv.sqlUpdate("insert into printSink select *,
> current_timestamp from randomSource");
> tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as
> following:
> {code:java}
> public TypeInformation<Row> getRecordType() {
> return getTableSchema().toRowType();
> }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table
> field 'a' does not match with the physical type STRING of the 'a' field of
> the TableSink consumed type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
> at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
> at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
> at
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
> at
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
> Other type validation exception is similar, I dig into and think it's caused
> by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the
> method doesn't consider the different physical and logical type validation
> logic of source and sink: logical type should be able to cover the physical
> type in source, but physical type should be able to cover the logic type in
> sink vice verse. Besides, the decimal type should be taken more carefully,
> when target type is Legacy(Decimal), it should be able to accept any
> precision decimal type.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)