[
https://issues.apache.org/jira/browse/FLINK-20301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239116#comment-17239116
]
Timo Walther commented on FLINK-20301:
--------------------------------------
[~hehuiyuan] We have improved the new type system significantly in Flink 1.11
and Flink 1.12 so errors like this are already fixed in newer version by new
source/sink interface architecture. In 1.10 you need to make sure that you use
either old type system everywhere or new type system everywhere. DECIMAL and
LEGACY DECIMAL have different semantics. The latter has variable precision and
scale. In your case the decimal should be declared with `DECIMAL(38, 18)` in
the new type system or `Types.DECIMAL` in the old type system to be compatible.
> Flink sql 1.10 : Legacy Decimal and decimal for Array that is not Compatible
> ------------------------------------------------------------------------------
>
> Key: FLINK-20301
> URL: https://issues.apache.org/jira/browse/FLINK-20301
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: hehuiyuan
> Priority: Minor
> Attachments: image-2020-11-23-23-48-02-102.png
>
>
> The error log:
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the
> physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of
> the TableSource return type.Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type ARRAY<DECIMAL(38, 18)>
> of table field 'numbers' does not match with the physical type
> ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the TableSource
> return type. at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:160)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:185)
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:246)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:228)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:206)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:110)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> {code}
>
>
>
> Background :
> Flink SQL — blink ---1.10
> The shema for TableSource is JSON:
>
> {code:java}
> "{type:'object',properties:{age:{type:'number'},numbers: { type: 'array',
> items: { type: 'number' } },name:{type:'string'},dt:{type: 'string', format:
> 'date-time'},timehour:{type: 'string', format: 'time'} }}"
> {code}
>
> The validate throw exception :
> Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the
> physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of
> the TableSource return type.
> The type `Array[Decimal]` should be considered.
> !image-2020-11-23-23-48-02-102.png|width=594,height=364!
>
> I think the `visit ( ArrayType source)` should be considered.
> {code:java}
> @Override
> public Boolean visit(ArrayType sourceType1){
> if (sourceType1 instanceof ArrayType
> && sourceType1.getElementType() instanceof LegacyTypeInformationType
> && sourceType1.getElementType().getTypeRoot() ==
> LogicalTypeRoot.DECIMAL) {
> if (!(targetType instanceof ArrayType && ((ArrayType)
> targetType).getElementType() instanceof DecimalType)) {
> return false;
> }
> DecimalType logicalDecimalType = (DecimalType) ((ArrayType)
> targetType).getElementType();
> if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
> logicalDecimalType.getScale() != 18) {
> throw new ValidationException(
> "Legacy decimal type can only be mapped to DECIMAL(38, 18).");
> }
> }
> return true;
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)