[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089258#comment-17089258
 ] 

Terry Wang commented on FLINK-17313:
------------------------------------

I open a 
[https://github.com/apache/flink/pull/11848|https://github.com/apache/flink/pull/11848]
 to help understanding and solve this validation exception.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17313
>                 URL: https://issues.apache.org/jira/browse/FLINK-17313
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Terry Wang
>            Priority: Major
>              Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>               tEnv.sqlUpdate("create table randomSource (" +
>                                               "               a varchar(10)," 
> +
>                                               "               b 
> decimal(20,2)" +
>                                               "       ) with (" +
>                                               "               'type' = 
> 'random'," +
>                                               "               'count' = '10'" 
> +
>                                               "       )");
>               tEnv.sqlUpdate("create table printSink (" +
>                                               "               a varchar(10)," 
> +
>                                               "               b 
> decimal(22,2)," +
>                                               "               c 
> timestamp(3)," +
>                                               "       ) with (" +
>                                               "       'type' = 'print'" +
>                                               "       )");
>               tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>               tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation<Row> getRecordType() {
>               return getTableSchema().toRowType();
>       }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>       at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>       at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>       at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>       at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>       at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>       at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
> Other type validation exception is similar, I dig into and think it's caused 
> by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the 
> method doesn't consider the different physical and logical type validation 
> logic of source and sink:   logical type should be able to cover the physical 
> type in source, but physical type should be able to cover the logic type in 
> sink vice verse. Besides, the decimal type should be taken more carefully, 
> when target type is Legacy(Decimal), it should be able to accept any 
> precision decimal type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to