[ 
https://issues.apache.org/jira/browse/FLINK-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16899948#comment-16899948
 ] 

Wei Zhong edited comment on FLINK-13578 at 8/5/19 9:49 AM:
-----------------------------------------------------------

I guess one of the reasons for this problem is this method: 
*TypeInfoLogicalTypeConverter#fromLogicalTypeToTypeInfo*. It uses the default 
ConversionClass for each LogicalType and ignores the real ConversionClass. 

 
{code:java}
public static TypeInformation fromLogicalTypeToTypeInfo(LogicalType type) {
   DataType dataType = fromLogicalTypeToDataType(type)
         .nullable()
         
.bridgedTo(ClassLogicalTypeConverter.getDefaultExternalClassForType(type));
   return TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(dataType);
}
{code}
 


was (Author: zhongwei):
I guess one of the reasons for this problem is this method: 
*TypeInfoLogicalTypeConverter#fromLogicalTypeToTypeInfo*. It uses the default 
ConversionClass for each LogicalType and ignores the real ConversionClass. In 
addition, it forces the DataType to be nullable, which causes another problem: 
if the input type is PrimitiveArrayType (the element must be not null), this 
method will return an ObjectArrayType. It will cause other check failures.

 
{code:java}
public static TypeInformation fromLogicalTypeToTypeInfo(LogicalType type) {
   DataType dataType = fromLogicalTypeToDataType(type)
         .nullable()
         
.bridgedTo(ClassLogicalTypeConverter.getDefaultExternalClassForType(type));
   return TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(dataType);
}
{code}
 

> Blink throws exception when using Types.INTERVAL_MILLIS in TableSource
> ----------------------------------------------------------------------
>
>                 Key: FLINK-13578
>                 URL: https://issues.apache.org/jira/browse/FLINK-13578
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0
>            Reporter: Wei Zhong
>            Priority: Critical
>
> Running this program will throw a TableException:
> {code:java}
> object Tests {
>   class MyTableSource extends InputFormatTableSource[java.lang.Long] {
>     val data = new java.util.ArrayList[java.lang.Long]()
>     data.add(1L)
>     data.add(2L)
>     data.add(3L)
>     val dataType = Types.INTERVAL_MILLIS()
>     val inputFormat = new CollectionInputFormat[java.lang.Long](
>       data, dataType.createSerializer(new ExecutionConfig))
>     override def getInputFormat: InputFormat[java.lang.Long, _ <: InputSplit] 
> = inputFormat
>     override def getTableSchema: TableSchema = 
> TableSchema.fromTypeInfo(dataType)
>     override def getReturnType: TypeInformation[java.lang.Long] = dataType
>   }
>   def main(args: Array[String]): Unit = {
>     val tenv = TableEnvironmentImpl.create(
>       
> EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build())
>     val table = tenv.fromTableSource(new MyTableSource)
>     tenv.registerTableSink("sink", Array("f0"),
>       Array(Types.INTERVAL_MILLIS()), new CsvTableSink("/tmp/results"))
>     table.select("f0").insertInto("sink")
>     tenv.execute("test")
>   }
> }
> {code}
> The TableException detail:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> Unsupported conversion from data type 'INTERVAL SECOND(3)' (conversion class: 
> java.time.Duration) to type information. Only data types that originated from 
> type information fully support a reverse conversion.
> at 
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:242)
> at 
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
> at 
> org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(TypeInfoDataTypeConverter.java:145)
> at 
> org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(TypeInfoLogicalTypeConverter.java:58)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
> at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at 
> org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.<init>(BaseRowTypeInfo.java:64)
> at 
> org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.of(BaseRowTypeInfo.java:210)
> at 
> org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:126)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:112)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToTransformation(BatchExecSink.scala:127)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:92)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:50)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:50)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:70)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:69)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408)
> at 
> org.apache.flink.table.planner.runtime.batch.table.Tests$.main(CalcITCase.scala:711)
> at 
> org.apache.flink.table.planner.runtime.batch.table.Tests.main(CalcITCase.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to