[ https://issues.apache.org/jira/browse/FLINK-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16899861#comment-16899861 ]
Jark Wu commented on FLINK-13578: --------------------------------- cc [~lzljs3620320] > Blink throws exception when using Types.INTERVAL_MILLIS in TableSource > ---------------------------------------------------------------------- > > Key: FLINK-13578 > URL: https://issues.apache.org/jira/browse/FLINK-13578 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.9.0 > Reporter: Wei Zhong > Priority: Critical > > Running this program will throw a TableException: > {code:java} > object Tests { > class MyTableSource extends InputFormatTableSource[java.lang.Long] { > val data = new java.util.ArrayList[java.lang.Long]() > data.add(1L) > data.add(2L) > data.add(3L) > val dataType = Types.INTERVAL_MILLIS() > val inputFormat = new CollectionInputFormat[java.lang.Long]( > data, dataType.createSerializer(new ExecutionConfig)) > override def getInputFormat: InputFormat[java.lang.Long, _ <: InputSplit] > = inputFormat > override def getTableSchema: TableSchema = > TableSchema.fromTypeInfo(dataType) > override def getReturnType: TypeInformation[java.lang.Long] = dataType > } > def main(args: Array[String]): Unit = { > val tenv = TableEnvironmentImpl.create( > > EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()) > val table = tenv.fromTableSource(new MyTableSource) > tenv.registerTableSink("sink", Array("f0"), > Array(Types.INTERVAL_MILLIS()), new CsvTableSink("/tmp/results")) > table.select("f0").insertInto("sink") > tenv.execute("test") > } > } > {code} > The TableException detail: > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: > Unsupported conversion from data type 'INTERVAL SECOND(3)' (conversion class: > java.time.Duration) to type information. Only data types that originated from > type information fully support a reverse conversion. > at > org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:242) > at > org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49) > at > org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(TypeInfoDataTypeConverter.java:145) > at > org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(TypeInfoLogicalTypeConverter.java:58) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) > at > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) > at > org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.<init>(BaseRowTypeInfo.java:64) > at > org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo.of(BaseRowTypeInfo.java:210) > at > org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:126) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:112) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlanInternal(BatchExecTableSourceScan.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecTableSourceScan.translateToPlan(BatchExecTableSourceScan.scala:48) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToTransformation(BatchExecSink.scala:127) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:92) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:50) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:50) > at > org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:69) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408) > at > org.apache.flink.table.planner.runtime.batch.table.Tests$.main(CalcITCase.scala:711) > at > org.apache.flink.table.planner.runtime.batch.table.Tests.main(CalcITCase.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)