[ 
https://issues.apache.org/jira/browse/FLINK-20301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17238766#comment-17238766
 ] 

hehuiyuan commented on FLINK-20301:
-----------------------------------

[~twalthr] , 

custom TableSource :  DebugTableSource

custom TableSink: DebugTableSink

SQL : 

"insert into tablesink select * from tablesource"

 
{code:java}
public class DebugTableSource implements StreamTableSource<Row>, 
DefinedProctimeAttribute, DefinedRowtimeAttributes {
  /** Field name of the processing time attribute, null if no processing time 
field is defined. */
  private final Optional<String> proctimeAttribute;

  /** Descriptor for a rowtime attribute. */
  private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;

  private final TableSchema schema;

  private final DeserializationSchema<Row> deserializationSchema;

  //test data
  private final List<String> testData;

  // data type (json jdwdata)
  private final String dataType;

  private final Long runningTime;

  public DebugTableSource(Optional<String> proctimeAttribute, 
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, TableSchema 
schema, DeserializationSchema<Row> deserializationSchema, List<String> 
testData, String dataType, Long runningTime) {
    this.proctimeAttribute = proctimeAttribute;
    this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
    this.schema = schema;
    this.deserializationSchema = deserializationSchema;
    this.testData = testData;
    this.dataType = dataType;
    this.runningTime = runningTime;
  }

  @Nullable
  @Override
  public String getProctimeAttribute() {
    //不要直接调用proctimeAttribute.get()容易空指针
    return proctimeAttribute.orElse(null);
  }

  @Override
  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
    return rowtimeAttributeDescriptors;
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment 
streamExecutionEnvironment) {
    DebugSource debugSource = new DebugSource(testData, deserializationSchema, 
dataType, runningTime);
    return 
streamExecutionEnvironment.addSource(debugSource).returns(deserializationSchema.getProducedType());
  }

  @Override
  public TableSchema getTableSchema() {
    return schema;
  }

  @Override
  public DataType getProducedDataType() {
    return 
TypeConversions.fromLegacyInfoToDataType(deserializationSchema.getProducedType());
  }

  @Override
  public TypeInformation<Row> getReturnType() {
    return deserializationSchema.getProducedType();
  }

  @Override
  public String explainSource() {
    return TableConnectorUtils.generateRuntimeName(this.getClass(), 
schema.getFieldNames());
  }
}


{code}
 

 

 

> Flink sql 1.10 : Legacy Decimal and decimal  for Array  that is not Compatible
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-20301
>                 URL: https://issues.apache.org/jira/browse/FLINK-20301
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Minor
>         Attachments: image-2020-11-23-23-48-02-102.png
>
>
> The error log:
>  
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the 
> physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of 
> the TableSource return type.Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: Type ARRAY<DECIMAL(38, 18)> 
> of table field 'numbers' does not match with the physical type 
> ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of the TableSource 
> return type. at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:160)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:185)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:246)
>  at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) at 
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) 
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:228)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:206)
>  at 
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:110)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> {code}
>  
>  
>  
> Background :
> Flink SQL  — blink ---1.10
> The shema for TableSource is JSON:
>  
> {code:java}
>  "{type:'object',properties:{age:{type:'number'},numbers: { type: 'array', 
> items: { type: 'number' } },name:{type:'string'},dt:{type: 'string', format: 
> 'date-time'},timehour:{type: 'string', format: 'time'} }}"
> {code}
>  
> The validate throw exception :
> Type ARRAY<DECIMAL(38, 18)> of table field 'numbers' does not match with the 
> physical type ARRAY<LEGACY('DECIMAL', 'DECIMAL')> of the 'numbers' field of 
> the TableSource return type.
> The type `Array[Decimal]`  should be considered.
> !image-2020-11-23-23-48-02-102.png|width=594,height=364!
>  
> I think the `visit ( ArrayType source)` should be considered.
> {code:java}
> @Override
> public Boolean visit(ArrayType sourceType1){
>    if (sourceType1 instanceof ArrayType
>       && sourceType1.getElementType() instanceof LegacyTypeInformationType
>       && sourceType1.getElementType().getTypeRoot() == 
> LogicalTypeRoot.DECIMAL) {
>       if (!(targetType instanceof ArrayType && ((ArrayType) 
> targetType).getElementType() instanceof DecimalType)) {
>          return false;
>       }
>       DecimalType logicalDecimalType = (DecimalType) ((ArrayType) 
> targetType).getElementType();
>       if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
>          logicalDecimalType.getScale() != 18) {
>          throw new ValidationException(
>             "Legacy decimal type can only be mapped to DECIMAL(38, 18).");
>       }
>    }
>    return true;
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to