[ https://issues.apache.org/jira/browse/FLINK-18339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
hehuiyuan updated FLINK-18339: ------------------------------ Attachment: image-2020-06-17-15-24-24-093.png > ValidationException exception that field typeinformation in TableSchema and > in TableSource return type for blink > ----------------------------------------------------------------------------------------------------------------- > > Key: FLINK-18339 > URL: https://issues.apache.org/jira/browse/FLINK-18339 > Project: Flink > Issue Type: Bug > Affects Versions: 1.9.0 > Reporter: hehuiyuan > Priority: Minor > Attachments: image-2020-06-17-10-37-48-166.png, > image-2020-06-17-10-53-08-424.png, image-2020-06-17-15-24-16-290.png, > image-2020-06-17-15-24-24-093.png > > > The type of `datatime` field is OBJECT_ARRAY<STRING>. > > Exception: > > {code:java} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not > match with type BasicArrayTypeInfo<String> of the field 'datatime' of the > TableSource return type.Exception in thread "main" > org.apache.flink.table.api.ValidationException: Type > LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match > with type BasicArrayTypeInfo<String> of the field 'datatime' of the > TableSource return type. at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at > org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) > {code} > > Usage: > `fieldNames` is field name array > `fieldsType` is field type array > > We can acquire field typeinformation by the way: > > {code:java} > TypeInformation typeInformation = > TypeStringUtils.readTypeInfo("OBJECT_ARRAY<STRING>"); > {code} > > > {code:java} > ConnectTableDescriptor d = > descriptor.withFormat( > new Csv().fieldDelimiter(fielddelimiter).schema(new > RowTypeInfo(fieldsType,fieldNames)) > ) > .withSchema( > schema > ); > {code} > > (1) RowTypeInfo(fieldsType,fieldNames) calls toString method: > Row(name: String, age: Integer, sex: String, datatime: > BasicArrayTypeInfo<String>) > `datatime` field type is BasicArrayTypeInfo<String>. > > (2)Schema shema : > schema = schema.field(fieldNames[i],fieldsType[i]); > `datatime` field type is BasicArrayTypeInfo<String> > !image-2020-06-17-10-37-48-166.png! > > Code analysis: > > `schemaBuilder.field(name, type)` is called when create TableSchema > {code:java} > public Builder field(String name, TypeInformation<?> typeInfo) { > return field(name, fromLegacyInfoToDataType(typeInfo)); > } > public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) { > return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo); > } > public static DataType toDataType(TypeInformation<?> typeInfo) { > // time indicators first as their hashCode/equals is shared with those of > regular timestamps > if (typeInfo instanceof TimeIndicatorTypeInfo) { > return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo); > } > final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo); > if (foundDataType != null) { > return foundDataType; > } > if (typeInfo instanceof RowTypeInfo) { > return convertToRowType((RowTypeInfo) typeInfo); > } > else if (typeInfo instanceof ObjectArrayTypeInfo) { > return convertToArrayType( > typeInfo.getTypeClass(), > ((ObjectArrayTypeInfo) typeInfo).getComponentInfo()); > } > else if (typeInfo instanceof BasicArrayTypeInfo) { > return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo); > } > else if (typeInfo instanceof MultisetTypeInfo) { > return convertToMultisetType(((MultisetTypeInfo) > typeInfo).getElementTypeInfo()); > } > else if (typeInfo instanceof MapTypeInfo) { > return convertToMapType((MapTypeInfo) typeInfo); > } > else if (typeInfo instanceof CompositeType) { > return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo); > } > return createLegacyType(LogicalTypeRoot.ANY, typeInfo); > } > {code} > if typeinformation is BasicArrayTypeinfo , the code is called: > {code:java} > else if (typeInfo instanceof BasicArrayTypeInfo) { > return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo); > } > private static DataType createLegacyType(LogicalTypeRoot typeRoot, > TypeInformation<?> typeInfo) { > return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, > typeInfo)) > .bridgedTo(typeInfo.getTypeClass()); > } > {code} > `datatime` field type is LEGACY(BasicArrayTypeInfo<String>) > > > > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)