[ 
https://issues.apache.org/jira/browse/FLINK-18339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hehuiyuan updated FLINK-18339:
------------------------------
    Description: 
The  type of `datatime` field   is OBJECT_ARRAY<STRING>.

 

Exception:

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match 
with type BasicArrayTypeInfo<String> of the field 'datatime' of the TableSource 
return type.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match 
with type BasicArrayTypeInfo<String> of the field 'datatime' of the TableSource 
return type. at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
{code}
 

Usage:

`fieldNames` is field name array

`fieldsType` is field type array

 

We can acquire field typeinformation by the way:

 
{code:java}
TypeInformation typeInformation = 
TypeStringUtils.readTypeInfo("OBJECT_ARRAY<STRING>");
{code}
 

 
{code:java}
ConnectTableDescriptor d =
  descriptor.withFormat(
    new Csv().fieldDelimiter(fielddelimiter).schema(new 
RowTypeInfo(fieldsType,fieldNames))
  )
  .withSchema(
    schema
  );
{code}
 

(1) RowTypeInfo(fieldsType,fieldNames) calls toString method:

Row(name: String, age: Integer, sex: String, datatime: 
BasicArrayTypeInfo<String>)

`datatime` field type is BasicArrayTypeInfo<String>.

 

(2)Schema shema :

  schema = schema.field(fieldNames[i],fieldsType[i]);

`datatime` field type is BasicArrayTypeInfo<String>

  !image-2020-06-17-10-37-48-166.png!

 

Code analysis:

 

`schemaBuilder.field(name, type)`  is called when create TableSchema
{code:java}
public Builder field(String name, TypeInformation<?> typeInfo) {
   return field(name, fromLegacyInfoToDataType(typeInfo));
}

public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) {
   return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo);
}

public static DataType toDataType(TypeInformation<?> typeInfo) {
   // time indicators first as their hashCode/equals is shared with those of 
regular timestamps
   if (typeInfo instanceof TimeIndicatorTypeInfo) {
      return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo);
   }

   final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo);
   if (foundDataType != null) {
      return foundDataType;
   }

   if (typeInfo instanceof RowTypeInfo) {
      return convertToRowType((RowTypeInfo) typeInfo);
   }

   else if (typeInfo instanceof ObjectArrayTypeInfo) {
      return convertToArrayType(
         typeInfo.getTypeClass(),
         ((ObjectArrayTypeInfo) typeInfo).getComponentInfo());
   }

   else if (typeInfo instanceof BasicArrayTypeInfo) {
      return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
   }

   else if (typeInfo instanceof MultisetTypeInfo) {
      return convertToMultisetType(((MultisetTypeInfo) 
typeInfo).getElementTypeInfo());
   }

   else if (typeInfo instanceof MapTypeInfo) {
      return convertToMapType((MapTypeInfo) typeInfo);
   }

   else if (typeInfo instanceof CompositeType) {
      return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo);
   }

   return createLegacyType(LogicalTypeRoot.ANY, typeInfo);
}

{code}
if typeinformation is BasicArrayTypeinfo , the code is called:
{code:java}

else if (typeInfo instanceof BasicArrayTypeInfo) {
   return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
}

private static DataType createLegacyType(LogicalTypeRoot typeRoot, 
TypeInformation<?> typeInfo) {
   return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, 
typeInfo))
      .bridgedTo(typeInfo.getTypeClass());
}
{code}
`datatime` field type is  LEGACY(BasicArrayTypeInfo<String>)

 

 

 

 

 

 

 

 

  was:
The  type of `datatime` field   is OBJECT_ARRAY<STRING>.

 

Exception:

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match 
with type BasicArrayTypeInfo<String> of the field 'datatime' of the TableSource 
return type.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match 
with type BasicArrayTypeInfo<String> of the field 'datatime' of the TableSource 
return type. at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
 at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
{code}
 

Usage:

`fieldNames` is field name array

`fieldsType` is field type array

 

We can acquire field typeinformation by the way:

 
{code:java}
TypeInformation typeInformation = 
TypeStringUtils.readTypeInfo("OBJECT_ARRAY<STRING>");
{code}
 

 
{code:java}
ConnectTableDescriptor d =
  descriptor.withFormat(
    new Csv().fieldDelimiter(fielddelimiter).schema(new 
RowTypeInfo(fieldsType,fieldNames))
  )
  .withSchema(
    schema
  );
{code}
 

(1) RowTypeInfo(fieldsType,fieldNames) calls toString method:

Row(name: String, age: Integer, sex: String, datatime: 
BasicArrayTypeInfo<String>)

`datatime` field type is BasicArrayTypeInfo<String>.

 

(2)Schema shema :

  schema = schema.field(fieldNames[i],fieldsType[i]);

`datatime` field type is BasicArrayTypeInfo<String>

  !image-2020-06-17-10-37-48-166.png!

 

Code analysis:

 

`schemaBuilder.field(name, type)`  is called when create TableSchema
{code:java}
public Builder field(String name, TypeInformation<?> typeInfo) {
   return field(name, fromLegacyInfoToDataType(typeInfo));
}

public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) {
   return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo);
}

public static DataType toDataType(TypeInformation<?> typeInfo) {
   // time indicators first as their hashCode/equals is shared with those of 
regular timestamps
   if (typeInfo instanceof TimeIndicatorTypeInfo) {
      return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo);
   }

   final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo);
   if (foundDataType != null) {
      return foundDataType;
   }

   if (typeInfo instanceof RowTypeInfo) {
      return convertToRowType((RowTypeInfo) typeInfo);
   }

   else if (typeInfo instanceof ObjectArrayTypeInfo) {
      return convertToArrayType(
         typeInfo.getTypeClass(),
         ((ObjectArrayTypeInfo) typeInfo).getComponentInfo());
   }

   else if (typeInfo instanceof BasicArrayTypeInfo) {
      return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
   }

   else if (typeInfo instanceof MultisetTypeInfo) {
      return convertToMultisetType(((MultisetTypeInfo) 
typeInfo).getElementTypeInfo());
   }

   else if (typeInfo instanceof MapTypeInfo) {
      return convertToMapType((MapTypeInfo) typeInfo);
   }

   else if (typeInfo instanceof CompositeType) {
      return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo);
   }

   return createLegacyType(LogicalTypeRoot.ANY, typeInfo);
}

{code}
if typeinformation is BasicArrayTypeinfo , the code is called:
{code:java}
 return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo); }



private static DataType createLegacyType(LogicalTypeRoot typeRoot, 
TypeInformation<?> typeInfo) {
   return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, 
typeInfo))
      .bridgedTo(typeInfo.getTypeClass());
}
{code}
`datatime` field type is  LEGACY(BasicArrayTypeInfo<String>)

 

 

 

 

 

 

 

 


> ValidationException exception that  field typeinformation in TableSchema and 
> in TableSource return type for blink
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18339
>                 URL: https://issues.apache.org/jira/browse/FLINK-18339
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.9.0
>            Reporter: hehuiyuan
>            Priority: Minor
>         Attachments: image-2020-06-17-10-37-48-166.png, 
> image-2020-06-17-10-53-08-424.png
>
>
> The  type of `datatime` field   is OBJECT_ARRAY<STRING>.
>  
> Exception:
>  
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not 
> match with type BasicArrayTypeInfo<String> of the field 'datatime' of the 
> TableSource return type.Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: Type 
> LEGACY(BasicArrayTypeInfo<String>) of table field 'datatime' does not match 
> with type BasicArrayTypeInfo<String> of the field 'datatime' of the 
> TableSource return type. at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>  at 
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at 
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:141)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.scala:55)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregate.translateToPlan(StreamExecGroupWindowAggregate.scala:55)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:119)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
> {code}
>  
> Usage:
> `fieldNames` is field name array
> `fieldsType` is field type array
>  
> We can acquire field typeinformation by the way:
>  
> {code:java}
> TypeInformation typeInformation = 
> TypeStringUtils.readTypeInfo("OBJECT_ARRAY<STRING>");
> {code}
>  
>  
> {code:java}
> ConnectTableDescriptor d =
>   descriptor.withFormat(
>     new Csv().fieldDelimiter(fielddelimiter).schema(new 
> RowTypeInfo(fieldsType,fieldNames))
>   )
>   .withSchema(
>     schema
>   );
> {code}
>  
> (1) RowTypeInfo(fieldsType,fieldNames) calls toString method:
> Row(name: String, age: Integer, sex: String, datatime: 
> BasicArrayTypeInfo<String>)
> `datatime` field type is BasicArrayTypeInfo<String>.
>  
> (2)Schema shema :
>   schema = schema.field(fieldNames[i],fieldsType[i]);
> `datatime` field type is BasicArrayTypeInfo<String>
>   !image-2020-06-17-10-37-48-166.png!
>  
> Code analysis:
>  
> `schemaBuilder.field(name, type)`  is called when create TableSchema
> {code:java}
> public Builder field(String name, TypeInformation<?> typeInfo) {
>    return field(name, fromLegacyInfoToDataType(typeInfo));
> }
> public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) {
>    return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo);
> }
> public static DataType toDataType(TypeInformation<?> typeInfo) {
>    // time indicators first as their hashCode/equals is shared with those of 
> regular timestamps
>    if (typeInfo instanceof TimeIndicatorTypeInfo) {
>       return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo);
>    }
>    final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo);
>    if (foundDataType != null) {
>       return foundDataType;
>    }
>    if (typeInfo instanceof RowTypeInfo) {
>       return convertToRowType((RowTypeInfo) typeInfo);
>    }
>    else if (typeInfo instanceof ObjectArrayTypeInfo) {
>       return convertToArrayType(
>          typeInfo.getTypeClass(),
>          ((ObjectArrayTypeInfo) typeInfo).getComponentInfo());
>    }
>    else if (typeInfo instanceof BasicArrayTypeInfo) {
>       return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
>    }
>    else if (typeInfo instanceof MultisetTypeInfo) {
>       return convertToMultisetType(((MultisetTypeInfo) 
> typeInfo).getElementTypeInfo());
>    }
>    else if (typeInfo instanceof MapTypeInfo) {
>       return convertToMapType((MapTypeInfo) typeInfo);
>    }
>    else if (typeInfo instanceof CompositeType) {
>       return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo);
>    }
>    return createLegacyType(LogicalTypeRoot.ANY, typeInfo);
> }
> {code}
> if typeinformation is BasicArrayTypeinfo , the code is called:
> {code:java}
> else if (typeInfo instanceof BasicArrayTypeInfo) {
>    return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
> }
> private static DataType createLegacyType(LogicalTypeRoot typeRoot, 
> TypeInformation<?> typeInfo) {
>    return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, 
> typeInfo))
>       .bridgedTo(typeInfo.getTypeClass());
> }
> {code}
> `datatime` field type is  LEGACY(BasicArrayTypeInfo<String>)
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to