[ 
https://issues.apache.org/jira/browse/FLINK-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16899988#comment-16899988
 ] 

Jingsong Lee commented on FLINK-13578:
--------------------------------------

Hi [~zhongwei], there is no conversion class in RelNode, it just contains 
LogicalType, so we use it default conversion class.

This case is in internal process, so the conversion class is useless, we can 
just support this conversion between LogicalType and TypeInformation.

> Blink throws exception when using Types.INTERVAL_MILLIS in TableSource
> ----------------------------------------------------------------------
>
>                 Key: FLINK-13578
>                 URL: https://issues.apache.org/jira/browse/FLINK-13578
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0
>            Reporter: Wei Zhong
>            Priority: Critical
>
> Running this program will throw a TableException:
> {code:java}
> object Tests {
>   class MyTableSource extends InputFormatTableSource[java.lang.Long] {
>     val data = new java.util.ArrayList[java.lang.Long]()
>     data.add(1L)
>     data.add(2L)
>     data.add(3L)
>     val dataType = Types.INTERVAL_MILLIS()
>     val inputFormat = new CollectionInputFormat[java.lang.Long](
>       data, dataType.createSerializer(new ExecutionConfig))
>     override def getInputFormat: InputFormat[java.lang.Long, _ <: InputSplit] 
> = inputFormat
>     override def getTableSchema: TableSchema = 
> TableSchema.fromTypeInfo(dataType)
>     override def getReturnType: TypeInformation[java.lang.Long] = dataType
>   }
>   def main(args: Array[String]): Unit = {
>     val tenv = TableEnvironmentImpl.create(
>       
> EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build())
>     val table = tenv.fromTableSource(new MyTableSource)
>     tenv.registerTableSink("sink", Array("f0"),
>       Array(Types.INTERVAL_MILLIS()), new CsvTableSink("/tmp/results"))
>     table.select("f0").insertInto("sink")
>     tenv.execute("test")
>   }
> }
> {code}
> The TableException detail:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> Unsupported conversion from data type 'INTERVAL SECOND(3)' (conversion class: 
> java.time.Duration) to type information. Only data types that originated from 
> type information fully support a reverse conversion.
> at 
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:242)
> at 
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
> at 
> org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(TypeInfoDataTypeConverter.java:145)
> at 
> org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(TypeInfoLogicalTypeConverter.java:58)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
> at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at 
> org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.<init>(BaseRowTypeInfo.java:64)
> at 
> org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.of(BaseRowTypeInfo.java:210)
> at 
> org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:126)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:112)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToTransformation(BatchExecSink.scala:127)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:92)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:50)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:50)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:70)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:69)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408)
> at 
> org.apache.flink.table.planner.runtime.batch.table.Tests$.main(CalcITCase.scala:711)
> at 
> org.apache.flink.table.planner.runtime.batch.table.Tests.main(CalcITCase.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to