[
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089382#comment-17089382
]
Wenlong Lyu commented on FLINK-17313:
-------------------------------------
[~dwysakowicz] I think it is not necessary that the schema of logical schema
and physical schema should be matched exactly:
Currently, we allow a column in source : logical type varchar(10), while
pyshical type is varchar(5), see `CastAvoidanceChecker` used in the compatible
check.
The requirement on source is that: we need to be able convert a physical record
of source to internal record according to physicalDataType and LogicalDataType.
On sinks, the requirements should be reversed: we need to be able convert an
internal record to a physical record for sink: so we can allow a column of
sink whose Logical type is varchar(5) but physical type is varchar(10).
On validation: we has an validation to make sure that the schema of source
query of a sink match the logical type, the validation between logical type and
physical can be much more loose I think.
> Validation error when insert decimal/timestamp/varchar with precision into
> sink using TypeInformation of row
> ------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Terry Wang
> Priority: Major
> Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
> tEnv.sqlUpdate("create table randomSource (" +
> " a varchar(10),"
> +
> " b
> decimal(20,2)" +
> " ) with (" +
> " 'type' =
> 'random'," +
> " 'count' = '10'"
> +
> " )");
> tEnv.sqlUpdate("create table printSink (" +
> " a varchar(10),"
> +
> " b
> decimal(22,2)," +
> " c
> timestamp(3)," +
> " ) with (" +
> " 'type' = 'print'" +
> " )");
> tEnv.sqlUpdate("insert into printSink select *,
> current_timestamp from randomSource");
> tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as
> following:
> {code:java}
> public TypeInformation<Row> getRecordType() {
> return getTableSchema().toRowType();
> }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table
> field 'a' does not match with the physical type STRING of the 'a' field of
> the TableSink consumed type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
> at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
> at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
> at
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
> at
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
> Other type validation exception is similar, I dig into and think it's caused
> by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the
> method doesn't consider the different physical and logical type validation
> logic of source and sink: logical type should be able to cover the physical
> type in source, but physical type should be able to cover the logic type in
> sink vice verse. Besides, the decimal type should be taken more carefully,
> when target type is Legacy(Decimal), it should be able to accept any
> precision decimal type.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)